我正在开发一个 Rust 应用程序,我需要实现服务器发送事件 (SSE),以便在 MongoDB 集合发生更改时向客户端发送更新。我正在使用
async_stream
创建一个异步流,该流会从集合中产生更改。但是,我遇到了一个问题,SSE 似乎无法工作,因为线程陷入了 while
循环,等待更改。
这是我的代码的相关部分:
async fn sse() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
let coll: Collection<Document> = db().await.collection("Test");
let mut watching = coll.watch(None, None).await.unwrap();
let watching_stream = async_stream::stream! {
while let Some(change) = watching.next().await {
yield change;
}
};
tokio::spawn(async move {
futures::pin_mut!(watching_stream);
loop {
match watching_stream.select_next_some().await {
Ok(_) => {
match tx.send(Ok(Event::default().data("test".to_string()))) {
Ok(_) => {
println!("tx sent")
}
Err(e) => {println!("{e}")}
}
}
_ => {
match tx.send(Ok(Event::default().data("err".to_string()))) {
Ok(_) => {
println!("err sent")
}
Err(e) => {println!("{e}")}
}
}
}
}
});
Sse::new(stream).keep_alive(KeepAlive::default())
}
我希望 SSE 在集合发生变化后立即将更新返回给客户端,但似乎流没有被发送出去。 有人可以帮助我理解为什么 SSE 没有按预期运行以及如何修复它吗?
在探索了 Rust 的 Axum 框架中实现从 MongoDB 到客户端的实时数据传输的各种方法之后,我得出的结论是,与服务器发送事件 (SSE) 相比,WebSocket 提供了更可靠的解决方案。尽管多次尝试利用 SSE 来实现此目的,但挑战仍然存在,导致我采用了 WebSocket。该方法与Axum的逻辑无缝集成,确保数据立即传输到客户端。对于那些寻求有效方法将新 MongoDB 数据实时推送到客户端的人,我建议根据我的经验将 WebSockets 与 Axum 结合使用。