如何检测Rust Rocket_ws客户端与WebSocket断开连接

问题描述 投票:0回答:1

rocket_ws
文档(https://api.rocket.rs/v0.5/rocket_ws/)我知道我可以使用这段代码与客户端建立websocket连接:

#[get("/echo?channel")]
fn echo_channel(ws: ws::WebSocket) -> ws::Channel<'static> {
use rocket::futures::{SinkExt, StreamExt};

ws.channel(move |mut stream| Box::pin(async move {
    while let Some(message) = stream.next().await {
        let _ = stream.send(message?).await;
    }

    Ok(())
}))

}

但是如何检测连接已关闭以及客户端已断开连接?

此示例仅显示使用

stream.next()
读取消息的用例,但是如果我不期望来自客户端的消息,而只想使用
let _ = stream.send(ws::Message::Text(json!(reading).to_string())).await;
定期向他发送新值(使用类似
let mut interval = interval(Duration::from_secs(10));
的内容)怎么办?

rust websocket rust-tokio rust-rocket
1个回答
0
投票

要检测客户端何时与 Websocket 断开连接,您可以侦听从客户端发送的

Close
消息。

代码看起来像这样:

Some(Ok(message)) = stream.next() => {
    match message {
      ws::Message::Close(close_frame) => {
          // Handle Close message
          println!("Received Close message: {:?}", close_frame);
          let close_frame = ws::frame::CloseFrame {
              code: ws::frame::CloseCode::Normal,
              reason: "Client disconected".to_string().into(),
          };
          let _ = stream.close(Some(close_frame)).await;
          break;
        }
}

因此,使用 Rocket_ws 在 Rust 中处理 Websocket 的整个代码将如下所示:

#[get("/ws")]
pub fn echo_channel(ws: ws::WebSocket) -> ws::Channel<'static> {
    use rocket::futures::{SinkExt, StreamExt};

    ws.channel(move |mut stream: ws::stream::DuplexStream| {
        Box::pin(async move {
            let mut interval = interval(Duration::from_secs(10));

            tokio::spawn(async move {
                loop {
                    tokio::select! {
                        _ = interval.tick() => {
                            // Send message every 10 seconds
                            let reading = get_latest_readings().await.unwrap();
                            let _ = stream.send(ws::Message::Text(json!(reading).to_string())).await;
                            // println!("Sent message");
                        }
                        Some(Ok(message)) = stream.next() => {
                            match message {
                                ws::Message::Text(text) => {
                                    // Handle Text message
                                    println!("Received Text message: {}", text);
                                }
                                ws::Message::Binary(data) => {
                                    // Handle Binary message
                                    println!("Received Binary message: {:?}", data);
                                }
                                ws::Message::Close(close_frame) => {
                                    // Handle Close message
                                    println!("Received Close message: {:?}", close_frame);
                                    let close_frame = ws::frame::CloseFrame {
                                        code: ws::frame::CloseCode::Normal,
                                        reason: "Client disconected".to_string().into(),
                                    };
                                    let _ = stream.close(Some(close_frame)).await;
                                    break;
                                }
                                ws::Message::Ping(ping_data) => {
                                    // Handle Ping message
                                    println!("Received Ping message: {:?}", ping_data);
                                }
                                ws::Message::Pong(pong_data) => {
                                    // Handle Pong message
                                    println!("Received Pong message: {:?}", pong_data);
                                }
                                _ => {
                                    println!("Received other message: {:?}", message);
                                }
                            }
                        }
                        else => {
                            println!("Connection closed");
                            let close_frame = ws::frame::CloseFrame {
                                code: ws::frame::CloseCode::Normal,
                                reason: "Client disconected".to_string().into(),
                            };
                            let _ = stream.close(Some(close_frame)).await;
                            // The connection is closed by the client
                            break;
                        }
                    }
                }
            });

            tokio::signal::ctrl_c().await.unwrap();
            Ok(())
        })
    })
}
© www.soinside.com 2019 - 2024. All rights reserved.