我有一个 SSE api,当通过另一个 POST API 端点更新数据时,它会更新前端。它似乎第一次工作,但是当我重新加载页面并通过 POST API 端点运行虚拟数据时,在我的前端从 SSE 读取发送数据之前,我必须多次发送数据。如果我这样做太多次,我会收到一条错误消息,指出连接已关闭。之后,如果我继续通过 POST API 发送数据,它会正常工作,直到我重新加载页面(或者假设关闭选项卡或浏览器?)。
我假设这是因为每当在客户端或服务器端重新加载页面时我都没有关闭连接。没有太多关于此的文档或资源,所以我不得不查看多个代码并尝试从中做出一些东西。
我该如何解决这个问题,或者更准确地说,如果页面刷新、重定向或关闭,我将如何关闭连接(只要该人不在特定页面上,我想关闭连接)。
接收事件流的前端代码
const [eventSource, setEventSource] = createSignal(undefined);
createEffect(() => {
setEventSource(new EventSource(url))
console.log("connected");
eventSource().onmessage = (event) => {
// set the data
};
eventSource().onerror = (event) => {
console.log(`Connection Error: ${event}`);
eventSource()?.close();
};
eventSource().onopen = (event) => {
console.log(`Connection Established: ${event}`);
};
});
onCleanup(() => {
console.log("Closing Connection")
eventSource()?.close();
});
后端代码监听来自 POST 请求的传入数据并将其发送到前端
func sseFunction(appCtx *fiber.Ctx, dataChannel chan map[string]any) error {
setHeader(appCtx)
ctx := appCtx.Context()
ctx.SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
log.Println("SSE Opened")
for {
select {
case message := <-dataChannel:
jsonString, err := json.Marshal(message)
if err != nil {
log.Println(err)
}
fmt.Fprintf(w, "data: %s\n\n", jsonString)
err = w.Flush()
if err != nil {
// Refreshing page in web browser will establish a new
// SSE connection, but only (the last) one is alive, so
// dead connections must be closed here.
fmt.Printf("Error while flushing: %v. Closing http connection.\n", err)
return
}
case <-ctx.Done():
fmt.Printf("Client closed connection")
return
}
}
}))
return nil
}
*** 更新 1:经过一些调试,当我关闭连接时,服务器似乎没有运行,但我不确定是否是因为客户端没有将关闭事件发送到后端,或者后端没有接收到它
case <-ctx.Done():
fmt.Printf("Client closed connection")
return
}
在做了一些广泛的研究并有了自己的假设之后,我最终假设
fasthttp
context().done()
没有检测到客户端与服务器的断开连接。
现在问题不是来自
fiber
,而是实际上来自于光纤正在使用的fasthttp context
。由于它不运行 select 语句中的 context().done()
,因此没有运行 return 语句,并且它会循环返回,认为连接仍然处于活动状态。当它尝试将数据发送到不存在的连接时,在多次发送重试后,w.flush()
返回一个错误,最终在错误处理程序块中运行 return 语句。
很多人不建议使用
gofiber
,只是因为fasthttp
(不知道为什么,但我想这是它造成的问题之一)。因此,我只是使用杜松子酒重写了我的应用程序,它运行得很好。没有任何问题,当客户端断开连接时,将调用完成通道。我什至不需要在我的 oncleanup
应用程序中关闭 Solidjs
。
SSE 使用杜松子酒
func sseFunction(appCtx *gin.Context, dataChannel chan map[string]any) {
log.Println("Setting Up SSE connection")
setHeader(appCtx)
appCtx.Stream(func(writer io.Writer) bool {
for {
select {
case message, ok := <-dataChannel:
if !ok {
return false
}
log.Println("Sending data from datachannel to SSE")
appCtx.SSEvent("message", message)
message = nil
return true
case <-appCtx.Request.Context().Done():
return false
}
}
})
}