在闭包之间共享Arc

问题描述 投票:1回答:2

我正在尝试编写一个简单的tcp服务器来读取和广播消息。 我正在使用Tokio,但我认为这更像是一个普通的Rust问题。

我有一个共享状态的Arc: let state = Arc::new(Mutex::new(Shared::new(server_tx)));

后来我想生成2个线程,它们将使用对该状态的引用:

let server = listener.incoming().for_each(move |socket| {
    // error[E0382]: capture of moved value: `state`
    process(socket, state.clone());
    Ok(())
}).map_err(|err| {
    println!("accept error = {:?}", err);
});

let receive_sensor_messages = sensors_rx.for_each(move |line| {
    println!("Received sensor message, broadcasting: {:?}", line);

    // error[E0597]: borrowed value does not live long enough
    // error[E0507]: cannot move out of borrowed content 
    for (_, tx) in state.clone().lock().unwrap().clients {
        tx.unbounded_send(line.clone()).unwrap();
    }
    Ok(())
}).map_err(|err| {
    println!("line reading error = {:?}", err);
});

(Qazxswpoi)

据我所知,它试图告诉我的是playground是在第一次封闭state借来的,所以当我试图在listener.incoming().for_each(move |socket| {再次这样做时,它说这是不可能的。

我的问题是如何解决它?是不是sensors_rx.for_each(move |line| {应该解决在线程之间共享变量的问题?我尝试了Arc的不同组合(在闭包之外做克隆,然后再在里面做clone),但没有一个工作。

干杯!

rust
2个回答
1
投票

基本上,你的问题可以归结为clone

to the following MCVE:

现在,这里的第一个问题是use std::sync::{Arc, Mutex}; struct Bar; fn foo(_ : &Bar){ println!("foo called"); } fn main(){ let example = Arc::new(Mutex::new(Bar)); std::thread::spawn(move ||{ let _ = example.clone(); }); // --- (1) --- std::thread::spawn(move ||{ foo(&example.clone().lock().unwrap()); }); } 被移动了。也就是说,一旦我们越过example,原来的(1)被认为是从。相反,我们需要首先example然后clone

move

另一个错误源于短暂的 let example = Arc::new(Mutex::new(Bar)); let local_state = example.clone(); std::thread::spawn(move ||{ let _ = local_state; // now fine! }); 。从本质上讲,它只能让你在我们的基础Arc上使用lock。虽然我们知道至少有一个其他Mutex指向内存,但编译器无法证明这一点。但是,如果我们摆脱Arc它没关系:

clone()

但是,您还可以通过使用其内容( let local_state = example.clone(); std::thread::spawn(move ||{ foo(&local_state.lock().unwrap()); }); )来遍历容器。相反,在那里使用clients,例如&)。

你可以在&local_state().unwrap().clients下面找到完整的固定代码:

or on the playground

1
投票

对于每个关闭,你必须提供自己的use std::sync::{Arc, Mutex}; struct Bar; fn foo(_ : &Bar){ println!("foo called"); } fn main(){ let example = Arc::new(Mutex::new(Bar)); let local_state = example.clone(); std::thread::spawn(move ||{ let _ = local_state; }); let local_state = example.clone(); std::thread::spawn(move ||{ foo(&local_state.lock().unwrap()); }).join(); } ,所以你必须事先Arc你的clone

Arc

你可以在这里省略let state = Arc::new(Mutex::new(Shared::new(server_tx))); let state1 = Arc::clone(&state); let state2 = Arc::clone(&state); let server = listener.incoming().for_each(move |socket| { process(socket, state1.clone()); Ok(()) }); let receive_sensor_messages = sensors_rx.for_each(move |line| { println!("Received sensor message, broadcasting: {:?}", line); let shared = state2.lock().unwrap(); for (_, tx) in &shared.clients { // better: `for tx in shared.clients.values()` tx.unbounded_send(line.clone()).unwrap(); } Ok(()) }); ,但我觉得这样做比较干净。

这样做的原因是,你将值state1移动到第一个闭包中,所以你不能在第二个闭包中使用它,因为它已经被移动了(有意义,不是吗?)。

© www.soinside.com 2019 - 2024. All rights reserved.