我正在尝试实现 SSE 的通知功能,当用户第一次连接到流时,后端应该将当前的通知写入其中,这似乎有效。但是,它会立即关闭连接,而不是保持连接打开状态,以便我将来可以向其写入新通知:
后端通知处理程序:
var clientStreams = []; // this is a list of all the streams opened by pract users to the backend
var newNotifications = [];
// every time a practitioner connects this event handler is run
// and a new stream is created for the newly connected practitioner
async function notificationEventsHandler(req, res) {
const headers = {
"Content-Type": "text/event-stream",
Connection: "keep-alive",
"Cache-Control": "no-cache",
};
const clientEmail = req.headers.patientemail;
const data = await ApptNotificationData.findAll({
where: {
patientEmail: clientEmail,
accepted: "1",
},
});
res.writeHead(200, headers);
res.write(`data:${JSON.stringify(data)}\n\n`);
const newClientStream = {
clientEmail: clientEmail,
res,
};
// add the new stream to list of streams
clientStreams.push(newClientStream);
//console.log("PRACT STREAMS", clientStreams)
req.on("close", () => {
console.log(`${clientEmail} Connection closed`);
clientStreams = clientStreams.filter(
(client) => client.clientEmail !== client.clientEmail
);
});
}
前端:
useEffect(() => {
const fetchData = async () => {
await fetchEventSource("https://api.***.com/client_notifications", {
method: "GET",
headers: {
Accept: "text/event-stream",
patientemail: patientData.email
},
onopen(res) {
if (res.ok && res.status === 200) {
console.log("Connection made ", res);
} else if (
res.status >= 400 &&
res.status < 500 &&
res.status !== 429
) {
console.log("Client side error ", res);
}
},
onmessage(event) {
const data = JSON.parse(event.data)
},
onclose() {
console.log("Connection closed by the server");
},
onerror(err) {
console.log("There was an error from server", err);
},
})
}
fetchData();
}, [])
nginx 配置:
server {
listen 80;
listen [::]:80;
server_name api.***.com;
location /client_notifications {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $host;
proxy_pass http://nodejs:3000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "";
chunked_transfer_encoding off;
proxy_buffering off;
proxy_cache off;
if ($request_method = 'OPTIONS') {
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS';
add_header 'Access-Control-Allow-Headers' 'DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,practemail,patientEmail';
add_header 'Access-Control-Max-Age' 1728000;
add_header 'Content-Type' 'text/plain; charset=utf-8';
add_header 'Content-Length' 0;
return 204;
}
if ($request_method = 'POST') {
add_header 'Access-Control-Allow-Origin' '*' always;
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS' always;
add_header 'Access-Control-Allow-Headers' 'DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,practemail,patientEmail' always;
add_header 'Access-Control-Expose-Headers' 'Content-Length,Content-Range' always;
}
if ($request_method = 'GET') {
add_header 'Access-Control-Allow-Origin' '*' always;
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS' always;
add_header 'Access-Control-Allow-Headers' 'DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,practemail,patientEmail' always;
add_header 'Access-Control-Expose-Headers' 'Content-Length,Content-Range' always;
}
}
}
server {
listen 443 ssl;
server_name api.**.com www.api.**.com;
ssl_certificate /etc/ssl/-.crt;
ssl_certificate_key /etc/ssl/-.key;
location /client_notifications {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $host;
proxy_pass http://nodejs:3000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "";
chunked_transfer_encoding off;
proxy_buffering off;
proxy_cache off;
}
}
如何保持连接打开?先谢谢你了
发生了什么:
您的 notificationEventsHandler 发送初始数据,然后函数结束,这允许响应关闭。 如果没有任何正在进行的操作或事件侦听器,Node.js 会认为工作已完成并关闭响应。 解决方案:
保持连接打开:
不要结束回复:
避免任何调用 res.end() 或允许函数完成而不设置保持活动状态的机制的代码。
保持事件循环忙碌:
使用 setInterval、setTimeout 或事件发射器等机制来保持 Node.js 事件循环处于活动状态。
修改您的后端处理程序:
更新您的 notificationEventsHandler 以确保连接保持打开状态并可以接收新通知。
更新后端代码:
const clientStreams = []; // List of connected clients
async function notificationEventsHandler(req, res) {
const headers = {
"Content-Type": "text/event-stream",
"Connection": "keep-alive",
"Cache-Control": "no-cache",
"Access-Control-Allow-Origin": "*", // If needed for CORS
};
res.writeHead(200, headers);
const clientEmail = req.headers.patientemail;
// Send initial data
const data = await ApptNotificationData.findAll({
where: {
patientEmail: clientEmail,
accepted: "1",
},
});
res.write(`data: ${JSON.stringify(data)}\n\n`);
// Add client to the list
const newClientStream = {
clientEmail: clientEmail,
res,
};
clientStreams.push(newClientStream);
// Keep the connection alive with a heartbeat (optional but recommended)
const keepAliveInterval = setInterval(() => {
res.write(":\n\n"); // Comment to keep the connection alive
}, 30000); // Every 30 seconds
// Remove client on disconnect
req.on("close", () => {
console.log(`${clientEmail} Connection closed`);
clearInterval(keepAliveInterval); // Stop the keep-alive interval
clientStreams.splice(clientStreams.indexOf(newClientStream), 1);
});
}
// Function to send notifications to all connected clients
function sendNotificationToClients(notification) {
clientStreams.forEach((client) => {
client.res.write(`data: ${JSON.stringify(notification)}\n\n`);
});
}
说明:
保持回复开放:
通过不结束响应(res.end()),连接保持打开状态。 setInterval 函数每 30 秒发送一次心跳,以防止代理或浏览器关闭空闲连接。 管理连接的客户端:
维护已连接客户端(clientStreams)的列表以向他们发送新通知。 当客户端断开连接时,将其从列表中删除。 发送通知:
使用单独的函数 (sendNotificationToClients) 向所有连接的客户端发送新通知。 Nginx 配置:
更新您的 Nginx 配置以正确处理 SSE:
server {
listen 80;
server_name api.yourdomain.com;
location /client_notifications {
proxy_pass http://nodejs:3000;
# SSE-specific settings
proxy_set_header Host $host;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
proxy_buffering off;
proxy_cache off;
# CORS headers (if needed)
add_header Access-Control-Allow-Origin *;
add_header Access-Control-Allow-Methods GET;
add_header Access-Control-Allow-Headers DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,practemail,patientEmail;
}
}
说明:
禁用缓冲和缓存:
proxy_buffering 关闭;确保数据立即发送到客户端。 proxy_cache 关闭;禁用 SSE 响应的缓存。 设置正确的标题:
proxy_set_header 连接'';确保连接保持打开状态。 proxy_http_版本 1.1;使用 HTTP/1.1 支持持久连接。 前端注意事项:
您的前端代码似乎大部分是正确的。确保您正确处理重新连接和错误。
useEffect(() => {
const fetchData = async () => {
await fetchEventSource("https://api.yourdomain.com/client_notifications", {
method: "GET",
headers: {
Accept: "text/event-stream",
patientemail: patientData.email,
},
onopen(res) {
if (res.ok && res.status === 200) {
console.log("Connection made ", res);
} else if (res.status >= 400 && res.status < 500 && res.status !== 429) {
console.log("Client side error ", res);
}
},
onmessage(event) {
const data = JSON.parse(event.data);
// Handle incoming data
},
onclose() {
console.log("Connection closed by the server");
// Optionally implement reconnection logic
},
onerror(err) {
console.log("There was an error from server", err);
// Optionally implement reconnection logic
},
});
};
fetchData();
}, []);
其他提示:
确保 CORS 配置正确:
如果您的前端位于不同的域中,请确保服务器发送适当的 Access-Control-Allow-Origin 标头。 处理保持活动机制:
某些服务器或代理可能会关闭空闲连接。发送评论(: ) 定期保持连接处于活动状态。 检查 Nginx 日志中的错误:
在 Nginx 或服务器日志中查找可能表明连接关闭原因的任何错误。 确保没有 res.end() 或类似的调用:
仔细检查您的代码,确保没有任何代码可能会无意中关闭响应。 为什么这有效:
持久连接:
通过保持响应打开而不结束它,您可以维持持久连接,允许服务器在数据可用时将数据发送到客户端。 事件循环活动:
setInterval 和事件监听器确保 Node.js 保持事件循环处于活动状态,防止进程退出。 正确的代理处理:
配置 Nginx 禁用缓冲和缓存可确保 SSE 数据立即传递到客户端。 结论:
实施这些更改应保持 SSE 连接打开,以便您可以在新通知发生时将其推送到客户端。
参考资料:
MDN Web 文档 - 服务器发送的事件 NGINX - 为服务器发送的事件配置 NGINX Express.js SSE 示例 发送新通知的示例:
每当创建新通知时(例如,在您的应用程序逻辑中),您可以调用:
const newNotification = {
message: "You have a new notification",
timestamp: new Date(),
};
sendNotificationToClients(newNotification);
这会将通知发送给所有连接的客户端。