tokio Stream 的固定问题

问题描述 投票:0回答:2

我想在 2s 之前和 2s 之后以不同的方式处理传入的 websocket 消息。

这很棘手,因为我们只有一个

read
(显然不能被克隆)并且也不高兴被传递给函数。

我想我会

select!
处理消息和计时器,然后
select!
在计时器融合第一个
select!
之后再次进行阶段 2,将
read
的可变借用传递给不同的处理功能。

事实证明,由于固定,我根本无法将

read
传递给函数。

use std::time::Duration;

use futures_util::{Stream, StreamExt};
use tokio_tungstenite::connect_async; 

async fn wait_2_seconds() {
    tokio::time::sleep(Duration::from_secs(2)).await;
}

async fn process_messages(read: &mut impl Stream) {
    while let Some(m) = read.next().await {
        let data = m.unwrap().into_data();
        println!("{m:?}");
    }
}

#[tokio::main]
async fn main() {
    let url = url::Url::parse("wss://127.0.0.1:12345").unwrap();

    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");

    // don't plan on sending anything to ws server so discard write half
    let (_, read) = ws_stream.split();

    tokio::select!{
        _ = process_messages(&mut read) => {}, 
        _ = wait_2_seconds() => {}, 
    };
    
    println!("phase 1 complete");
}

所以我不确定如何将(大量借用的)

read
传递给函数。

错误消息说考虑使用

Box::pin
但后来我意识到我什至知道how在这种情况下使用
Box::pin
。我尝试将
process_messages
参数类型更改为
Box<Pin<&mut impl Stream>>
并意识到我需要帮助。

rust stream rust-tokio
2个回答
1
投票

只需将

read
固定在
main()
中。您可以
Box::pin()
它或更好的
tokio::pin!()
它(或
futures::pin_mut()
,甚至每晚
std::pin::pin!()
)。您还需要指定流的
Item
类型。然后在
Pin<&mut impl Stream<Item = ...>>
中取
process_messages()

use std::pin::Pin;

use tokio_tungstenite::tungstenite::error::Error;
use tokio_tungstenite::tungstenite::protocol::Message;

async fn process_messages(mut read: Pin<&mut impl Stream<Item = Result<Message, Error>>>) {
    while let Some(m) = read.next().await {
        let data = m.unwrap().into_data();
        println!("{data:?}");
    }
}

#[tokio::main]
async fn main() {
    let url = url::Url::parse("wss://127.0.0.1:12345").unwrap();

    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");

    // don't plan on sending anything to ws server so discard write half
    let (_, read) = ws_stream.split();
    // Or `let read = Box::pin(read);`.
    tokio::pin!(read);

    tokio::select! {
        _ = process_messages(read.as_mut()) => {},
        _ = wait_2_seconds() => {},
    };

    println!("phase 1 complete");

    // Process after 2 seconds.
    process_messages(read).await;
}

0
投票

错误消息说考虑使用

Box::pin
但后来我意识到我什至知道如何在这种情况下使用
Box::pin
。我尝试将
process_messages
参数类型更改为
Box<Pin<&mut impl Stream>>
并意识到我需要帮助。

你快到了。

Box::pin
返回
Pin<Box<_>>
,而不是
Box<Pin<_>>
。一些更细微的调整和
Box::pin
编译:

/*
[dependencies]
tokio = { version = "*", features = ["full"] }
tokio-tungstenite = "*"
tungstenite = "*"
futures-util = "*"
url = "*"
*/
use std::time::Duration;
use std::pin::Pin;

use futures_util::{Stream, StreamExt};
use tokio_tungstenite::connect_async;

use tungstenite::protocol::Message;
use tungstenite::error::Error;

async fn wait_2_seconds() {
    tokio::time::sleep(Duration::from_secs(2)).await;
}

async fn process_messages(mut read: Pin<Box<impl Stream<Item=Result<Message, Error>>>>) {
    while let Some(m) = read.next().await {
        let data = m.unwrap().into_data();
        println!("{data:?}");
    }
}

#[tokio::main]
async fn main() {
    let url = url::Url::parse("wss://127.0.0.1:12345").unwrap();

    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");

    // don't plan on sending anything to ws server so discard write half
    let (_, read) = ws_stream.split();

    tokio::select!{
        _ = process_messages(Box::pin(read)) => {}, 
        _ = wait_2_seconds() => {}, 
    };
    
    println!("phase 1 complete");
}

Rustexplorer(编译,但在运行时出现恐慌,因为您无法绑定到套接字)。

© www.soinside.com 2019 - 2024. All rights reserved.