如何关闭其他线程拥有的套接字

问题描述 投票:0回答:1

在我的应用程序中,我使用 unix 域套接字来执行本地通信,并且为此使用了

uds
箱。我需要关闭套接字(有选择地关闭侦听器和客户端),但根据我当前的代码设计,我无法做到这一点。有人可以帮我吗。

main.rs

mod uds_listener;
mod uds_client;

use std::thread::JoinHandle;

use uds_listener::UdsListener;
use uds_client::UdsClient;
    
fn main() {
    let mut uds_listeners_thread_join_handle_list: Vec<JoinHandle<()>> = Vec::new();
    for i in  0..256 {
        let uds_listnener = UdsListener::new(&format!("{}{}{}", "/tmp/sock_", i.to_string(), ".sock"));
        let uds_listener_jh = uds_listnener.spawn_thread();
        uds_listeners_thread_join_handle_list.push(uds_listener_jh);
    }

    let mut uds_clients_thread_join_handle_list: Vec<JoinHandle<()>> = Vec::new();
    for i in  0..256 {
        let uds_client = UdsClient::new(&format!("{}{}{}", "/tmp/sock_", i.to_string(), ".sock"));
        let uds_client_jh = uds_client.spawn_thread();
        uds_clients_thread_join_handle_list.push(uds_client_jh);
    }

    // but how to request threads blocked on data reception to terminate
    // one way is to shutdown the socket(both read and write, which will result in data reception of length 0), but that's not possible with my current code design as socket is owned by thread
    println!("waiting for listener threads to join");
    while let Some(ljh) = uds_listeners_thread_join_handle_list.pop() {
        let _ = ljh.join();
    }

    println!("waiting for client threads to join");
    while let Some(cjh) = uds_clients_thread_join_handle_list.pop() {
        let _ = cjh.join();
    }
}

uds_listener.rs

use std::thread;

use uds::UnixSeqpacketListener;

pub struct UdsListener {
   listener: UnixSeqpacketListener,
}

impl UdsListener {
   pub fn new(sock_name: &str) -> Self {
      Self {
         listener: Self::bind(sock_name),
      }
   }

   fn bind(sock_name: &str) -> UnixSeqpacketListener {
      match UnixSeqpacketListener::bind(sock_name) {
         Ok(listener) => listener,
         Err(e) => panic!("uds listener bind failed for {sock_name}: {e:?}"),
      }
   }

   pub fn spawn_thread(mut self) -> thread::JoinHandle<()> {
      thread::spawn(move || {
         self.run();
      })
   }

   fn run(&mut self) {
      if let Ok((socket, _)) = self.listener.accept_unix_addr() {
         let mut buff = [0u8; 8192];
         loop {
            match socket.recv(&mut buff) {
               Ok(length) => {
                  if length == 0 {
                     break;
                  }
               },
               Err(e) => panic!("uds listener read failed: {e:?}"),
            }
         }
      }
   }
}

uds_client.rs

use std::{time, thread};

use uds::UnixSeqpacketConn;

pub struct UdsClient {
   socket: UnixSeqpacketConn,
}

impl UdsClient {
   pub fn new(sock_name: &str) -> Self {
      Self {
         socket: Self::connect(sock_name),
      }
   }

   fn connect(sock_name: &str) -> UnixSeqpacketConn {
      loop {
         match UnixSeqpacketConn::connect(sock_name) {
            Ok(socket) => return socket,
            Err(_) => thread::sleep(time::Duration::from_secs(1)),
         }
      }
   }

   pub fn spawn_thread(mut self) -> thread::JoinHandle<()> {
      thread::spawn(move || {
         self.run();
      })
   }

   fn run(&mut self) {
      let mut buff = [0u8; 8192];
      loop {
         match self.socket.recv(&mut buff) {
            Ok(length) => {
               if length == 0 {
                  break;
               }
            },
            Err(e) => panic!("uds client read failed: {e:?}"),
         }
      }
   }
}

cargo.toml

[package]
name = "udss"
version = "0.1.0"
edition = "2021"

[dependencies]
uds = "0.4.2"
rust unix-socket
1个回答
0
投票

这是我在评论中提到的一个简化示例,在 Rust Playground

中运行

基本上,套接字现在不属于客户端/侦听器,但可以通过将套接字包装到

Arc<RwLock<>>
中来在线程之间共享。特别是,这个
Arc
可以从主线程访问,并且可以从中获取对套接字的引用,这允许在套接字上调用
shutdown

use std::thread::JoinHandle;
use std::{time, thread};
use std::cell::RefCell;
use std::sync::{Arc, RwLock};

struct Socket {
    name: String, 
    running: RwLock<bool>,
}
impl Socket {
    fn connect(socket_name: &str) -> Self {
        println!("Connect {}", socket_name);
        Self{ name: socket_name.to_owned(), running: RwLock::new(true) } 
    }
        
    fn shutdown(&self) { 
        println!("Shutdown {}", self.name);
        *self.running.write().unwrap() = false;
    }
    fn recv(&self, buffer: &mut [u8]) -> Result<usize, std::io::Error> { 
        let mut x = 0;
        for i in 0..100000 { x += (i % 2) as usize; } 
        let length = if *self.running.read().unwrap() { 100 } else { 0 };
        Ok(length)
    }
}
    
fn main() {
    let mut uds_listeners_list: Vec<(JoinHandle<()>, Arc<RwLock<Socket>>)> = Vec::new();
    for i in  0..4 {
        let uds_listnener = UdsListener::new(&format!("{}{}{}", "/tmp/sock_", i.to_string(), ".sock"));
        let socket = uds_listnener.socket();
        uds_listeners_list.push((uds_listnener.spawn_thread(), socket));
    }

    let mut uds_clients_list: Vec<(JoinHandle<()>, Arc<RwLock<Socket>>)> = Vec::new();
    for i in  0..4 {
        let uds_client = UdsClient::new(&format!("{}{}{}", "/tmp/sock_", i.to_string(), ".sock"));
        let socket = uds_client.socket();
        uds_clients_list.push((uds_client.spawn_thread(), socket));
    }

    // but how to request threads blocked on data reception to terminate
    // one way is to shutdown the socket(both read and write, which will result in data reception of length 0), but that's not possible with my current code design as socket is owned by thread
    println!("waiting for listener threads to join");
    while let Some((ljh, socket)) = uds_listeners_list.pop() {
        socket.read().unwrap().shutdown();
        let _ = ljh.join();
    }

    println!("waiting for client threads to join");
    while let Some((cjh, socket)) = uds_clients_list.pop() {
        socket.read().unwrap().shutdown();
        let _ = cjh.join();
    }
}


pub struct UdsListener {
   socket: Arc<RwLock<Socket>>,
}

impl UdsListener {
   pub fn new(sock_name: &str) -> Self {
      Self {
         socket: Arc::new(RwLock::new(Socket::connect(sock_name))),
      }
   }
   
   pub fn socket(&self) -> Arc<RwLock<Socket>> {
       self.socket.clone()
   }

   pub fn spawn_thread(mut self) -> thread::JoinHandle<()> {
      thread::spawn(move || {
         self.run();
      })
   }

    fn run(&mut self) {
        let mut buff = [0u8; 8192];
        loop {
            match self.socket.read().unwrap().recv(&mut buff) {
               Ok(length) => {
                  if length == 0 {
                     break;
                  }
               },
               Err(e) => panic!("uds listener read failed: {e:?}"),
            }
        }
    }
}

pub struct UdsClient {
   socket: Arc<RwLock<Socket>>,
}

impl UdsClient {
   pub fn new(sock_name: &str) -> Self {
      Self {
         socket: Arc::new(RwLock::new(Socket::connect(sock_name))),
      }
   }
   
   pub fn socket(&self) -> Arc<RwLock<Socket>> {
       self.socket.clone()
   }
   
   pub fn spawn_thread(mut self) -> thread::JoinHandle<()> {
      thread::spawn(move || {
         self.run();
      })
   }

   fn run(&mut self) {
      let mut buff = [0u8; 8192];
      loop {
         match self.socket.read().unwrap().recv(&mut buff) {
            Ok(length) => {
            println!("Recv {}", length);
               if length == 0 {
                  break;
               }
               
            },
            Err(e) => panic!("uds client read failed: {e:?}"),
         }
      }
   }
}

输出

Connect /tmp/sock_0.sock
Connect /tmp/sock_1.sock
Connect /tmp/sock_2.sock
Connect /tmp/sock_3.sock
Connect /tmp/sock_0.sock
Connect /tmp/sock_1.sock
Connect /tmp/sock_2.sock
Connect /tmp/sock_3.sock
waiting for listener threads to join
Shutdown /tmp/sock_3.sock
Recv 100
Recv 100
Recv 100
Recv 100
Shutdown /tmp/sock_2.sock
Recv 100
Recv 100
Recv 100
Recv 100
Recv 100
Recv 100
Shutdown /tmp/sock_1.sock
Recv 100
Recv 100
Shutdown /tmp/sock_0.sock
Recv 100
Recv 100
Recv 100
Recv 100
Recv 100
waiting for client threads to join
Shutdown /tmp/sock_3.sock
Recv 100
Recv 100
Recv 100
Recv 0
Shutdown /tmp/sock_2.sock
Recv 100
Recv 100
Recv 0
Shutdown /tmp/sock_1.sock
Recv 0
Shutdown /tmp/sock_0.sock
Recv 0
© www.soinside.com 2019 - 2024. All rights reserved.