从
rocket_ws
文档(https://api.rocket.rs/v0.5/rocket_ws/)我知道我可以使用这段代码与客户端建立websocket连接:
#[get("/echo?channel")]
fn echo_channel(ws: ws::WebSocket) -> ws::Channel<'static> {
use rocket::futures::{SinkExt, StreamExt};
ws.channel(move |mut stream| Box::pin(async move {
while let Some(message) = stream.next().await {
let _ = stream.send(message?).await;
}
Ok(())
}))
}
但是如何检测连接已关闭以及客户端已断开连接?
此示例仅显示使用
stream.next()
读取消息的用例,但是如果我不期望来自客户端的消息,而只想使用 let _ = stream.send(ws::Message::Text(json!(reading).to_string())).await;
定期向他发送新值(使用类似 let mut interval = interval(Duration::from_secs(10));
的内容)怎么办?
要检测客户端何时与 Websocket 断开连接,您可以侦听从客户端发送的
Close
消息。
代码看起来像这样:
Some(Ok(message)) = stream.next() => {
match message {
ws::Message::Close(close_frame) => {
// Handle Close message
println!("Received Close message: {:?}", close_frame);
let close_frame = ws::frame::CloseFrame {
code: ws::frame::CloseCode::Normal,
reason: "Client disconected".to_string().into(),
};
let _ = stream.close(Some(close_frame)).await;
break;
}
}
因此,使用 Rocket_ws 在 Rust 中处理 Websocket 的整个代码将如下所示:
#[get("/ws")]
pub fn echo_channel(ws: ws::WebSocket) -> ws::Channel<'static> {
use rocket::futures::{SinkExt, StreamExt};
ws.channel(move |mut stream: ws::stream::DuplexStream| {
Box::pin(async move {
let mut interval = interval(Duration::from_secs(10));
tokio::spawn(async move {
loop {
tokio::select! {
_ = interval.tick() => {
// Send message every 10 seconds
let reading = get_latest_readings().await.unwrap();
let _ = stream.send(ws::Message::Text(json!(reading).to_string())).await;
// println!("Sent message");
}
Some(Ok(message)) = stream.next() => {
match message {
ws::Message::Text(text) => {
// Handle Text message
println!("Received Text message: {}", text);
}
ws::Message::Binary(data) => {
// Handle Binary message
println!("Received Binary message: {:?}", data);
}
ws::Message::Close(close_frame) => {
// Handle Close message
println!("Received Close message: {:?}", close_frame);
let close_frame = ws::frame::CloseFrame {
code: ws::frame::CloseCode::Normal,
reason: "Client disconected".to_string().into(),
};
let _ = stream.close(Some(close_frame)).await;
break;
}
ws::Message::Ping(ping_data) => {
// Handle Ping message
println!("Received Ping message: {:?}", ping_data);
}
ws::Message::Pong(pong_data) => {
// Handle Pong message
println!("Received Pong message: {:?}", pong_data);
}
_ => {
println!("Received other message: {:?}", message);
}
}
}
else => {
println!("Connection closed");
let close_frame = ws::frame::CloseFrame {
code: ws::frame::CloseCode::Normal,
reason: "Client disconected".to_string().into(),
};
let _ = stream.close(Some(close_frame)).await;
// The connection is closed by the client
break;
}
}
}
});
tokio::signal::ctrl_c().await.unwrap();
Ok(())
})
})
}