我想在 2s 之前和 2s 之后以不同的方式处理传入的 websocket 消息。
这很棘手,因为我们只有一个
read
(显然不能被克隆)并且也不高兴被传递给函数。
我想我会
select!
处理消息和计时器,然后 select!
在计时器融合第一个 select!
之后再次进行阶段 2,将 read
的可变借用传递给不同的处理功能。
事实证明,由于固定,我根本无法将
read
传递给函数。
use std::time::Duration;
use futures_util::{Stream, StreamExt};
use tokio_tungstenite::connect_async;
async fn wait_2_seconds() {
tokio::time::sleep(Duration::from_secs(2)).await;
}
async fn process_messages(read: &mut impl Stream) {
while let Some(m) = read.next().await {
let data = m.unwrap().into_data();
println!("{m:?}");
}
}
#[tokio::main]
async fn main() {
let url = url::Url::parse("wss://127.0.0.1:12345").unwrap();
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
// don't plan on sending anything to ws server so discard write half
let (_, read) = ws_stream.split();
tokio::select!{
_ = process_messages(&mut read) => {},
_ = wait_2_seconds() => {},
};
println!("phase 1 complete");
}
所以我不确定如何将(大量借用的)
read
传递给函数。
错误消息说考虑使用
Box::pin
但后来我意识到我什至知道how在这种情况下使用Box::pin
。我尝试将 process_messages
参数类型更改为 Box<Pin<&mut impl Stream>>
并意识到我需要帮助。
只需将
read
固定在 main()
中。您可以Box::pin()
它或更好的tokio::pin!()
它(或futures::pin_mut()
,甚至每晚std::pin::pin!()
)。您还需要指定流的Item
类型。然后在Pin<&mut impl Stream<Item = ...>>
中取process_messages()
:
use std::pin::Pin;
use tokio_tungstenite::tungstenite::error::Error;
use tokio_tungstenite::tungstenite::protocol::Message;
async fn process_messages(mut read: Pin<&mut impl Stream<Item = Result<Message, Error>>>) {
while let Some(m) = read.next().await {
let data = m.unwrap().into_data();
println!("{data:?}");
}
}
#[tokio::main]
async fn main() {
let url = url::Url::parse("wss://127.0.0.1:12345").unwrap();
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
// don't plan on sending anything to ws server so discard write half
let (_, read) = ws_stream.split();
// Or `let read = Box::pin(read);`.
tokio::pin!(read);
tokio::select! {
_ = process_messages(read.as_mut()) => {},
_ = wait_2_seconds() => {},
};
println!("phase 1 complete");
// Process after 2 seconds.
process_messages(read).await;
}
错误消息说考虑使用
但后来我意识到我什至知道如何在这种情况下使用Box::pin
。我尝试将Box::pin
参数类型更改为process_messages
并意识到我需要帮助。Box<Pin<&mut impl Stream>>
你快到了。
Box::pin
返回 Pin<Box<_>>
,而不是 Box<Pin<_>>
。一些更细微的调整和Box::pin
编译:
/*
[dependencies]
tokio = { version = "*", features = ["full"] }
tokio-tungstenite = "*"
tungstenite = "*"
futures-util = "*"
url = "*"
*/
use std::time::Duration;
use std::pin::Pin;
use futures_util::{Stream, StreamExt};
use tokio_tungstenite::connect_async;
use tungstenite::protocol::Message;
use tungstenite::error::Error;
async fn wait_2_seconds() {
tokio::time::sleep(Duration::from_secs(2)).await;
}
async fn process_messages(mut read: Pin<Box<impl Stream<Item=Result<Message, Error>>>>) {
while let Some(m) = read.next().await {
let data = m.unwrap().into_data();
println!("{data:?}");
}
}
#[tokio::main]
async fn main() {
let url = url::Url::parse("wss://127.0.0.1:12345").unwrap();
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
// don't plan on sending anything to ws server so discard write half
let (_, read) = ws_stream.split();
tokio::select!{
_ = process_messages(Box::pin(read)) => {},
_ = wait_2_seconds() => {},
};
println!("phase 1 complete");
}
Rustexplorer(编译,但在运行时出现恐慌,因为您无法绑定到套接字)。