ExpressJS 服务器没有为 SSE 保持连接打开

问题描述 投票:0回答:1

我正在尝试实现 SSE 的通知功能,当用户第一次连接到流时,后端应该将当前的通知写入其中,这似乎有效。但是,它会立即关闭连接,而不是保持连接打开状态,以便我将来可以向其写入新通知:

后端通知处理程序:

var clientStreams = []; // this is a list of all the streams opened by pract users to the backend
var newNotifications = [];
// every time a practitioner connects this event handler is run
// and a new stream is created for the newly connected practitioner

async function notificationEventsHandler(req, res) {
  const headers = {
    "Content-Type": "text/event-stream",
    Connection: "keep-alive",
    "Cache-Control": "no-cache",
  };

  const clientEmail = req.headers.patientemail;

  const data = await ApptNotificationData.findAll({
    where: {
      patientEmail: clientEmail,
      accepted: "1",
    },
  });

  res.writeHead(200, headers);
  res.write(`data:${JSON.stringify(data)}\n\n`);

  const newClientStream = {
    clientEmail: clientEmail,
    res,
  };

  // add the new stream to list of streams
  clientStreams.push(newClientStream);
  //console.log("PRACT STREAMS", clientStreams)

  req.on("close", () => {
    console.log(`${clientEmail} Connection closed`);
    clientStreams = clientStreams.filter(
      (client) => client.clientEmail !== client.clientEmail
    );
    
  });

  
}

前端:

useEffect(() => {
    const fetchData = async () => {
    await fetchEventSource("https://api.***.com/client_notifications", {
        method: "GET",
        headers: {
          Accept: "text/event-stream",
          patientemail: patientData.email
        },
        onopen(res) {
          if (res.ok && res.status === 200) {
            console.log("Connection made ", res);          
          } else if (
            res.status >= 400 &&
            res.status < 500 &&
            res.status !== 429
          ) {
            console.log("Client side error ", res);
          }
        },
        onmessage(event) {
          const data = JSON.parse(event.data)
        },
        onclose() {
          console.log("Connection closed by the server");
        },
        onerror(err) {
          console.log("There was an error from server", err);
        },
      })
    }
    fetchData();
  }, [])

nginx 配置:

server {
        
    listen 80;
    listen [::]:80;

    server_name api.***.com;

    location /client_notifications {
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header Host $host;
            proxy_pass http://nodejs:3000;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "";
            chunked_transfer_encoding off;
            proxy_buffering off;
            proxy_cache off;


            if ($request_method = 'OPTIONS') {
                    add_header 'Access-Control-Allow-Origin' '*';
                    add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS';
                    add_header 'Access-Control-Allow-Headers' 'DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,practemail,patientEmail';
                    add_header 'Access-Control-Max-Age' 1728000;
                    add_header 'Content-Type' 'text/plain; charset=utf-8';
                    add_header 'Content-Length' 0;
                    return 204;
            }
            if ($request_method = 'POST') {
                    add_header 'Access-Control-Allow-Origin' '*' always;
                    add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS' always;
                    add_header 'Access-Control-Allow-Headers' 'DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,practemail,patientEmail' always;
                    add_header 'Access-Control-Expose-Headers' 'Content-Length,Content-Range' always;
            }
            if ($request_method = 'GET') {
                    add_header 'Access-Control-Allow-Origin' '*' always;
                    add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS' always;
                    add_header 'Access-Control-Allow-Headers' 'DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,practemail,patientEmail' always;
                    add_header 'Access-Control-Expose-Headers' 'Content-Length,Content-Range' always;
            }
    }
}


server {
listen 443 ssl;

server_name api.**.com www.api.**.com;
ssl_certificate     /etc/ssl/-.crt;
ssl_certificate_key /etc/ssl/-.key;


    location /client_notifications {
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header Host $host;
            proxy_pass http://nodejs:3000;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "";
            chunked_transfer_encoding off;
            proxy_buffering off;
            proxy_cache off;
            
    }
}

如何保持连接打开?先谢谢你了

express server-sent-events
1个回答
0
投票

您遇到的问题是因为服务器在发送初始数据后关闭连接。在服务器发送事件 (SSE) 中,您需要保持 HTTP 连接打开,以便服务器可以在新事件发生时将其推送到客户端。

发生了什么:

您的 notificationEventsHandler 发送初始数据,然后函数结束,这允许响应关闭。 如果没有任何正在进行的操作或事件侦听器,Node.js 会认为工作已完成并关闭响应。 解决方案:

保持连接打开:

不要结束回复:

避免任何调用 res.end() 或允许函数完成而不设置保持活动状态的机制的代码。

保持事件循环忙碌:

使用 setInterval、setTimeout 或事件发射器等机制来保持 Node.js 事件循环处于活动状态。

修改您的后端处理程序:

更新您的 notificationEventsHandler 以确保连接保持打开状态并可以接收新通知。

更新后端代码:

const clientStreams = []; // List of connected clients

async function notificationEventsHandler(req, res) {
  const headers = {
    "Content-Type": "text/event-stream",
    "Connection": "keep-alive",
    "Cache-Control": "no-cache",
    "Access-Control-Allow-Origin": "*", // If needed for CORS
  };

  res.writeHead(200, headers);

  const clientEmail = req.headers.patientemail;

  // Send initial data
  const data = await ApptNotificationData.findAll({
    where: {
      patientEmail: clientEmail,
      accepted: "1",
    },
  });

  res.write(`data: ${JSON.stringify(data)}\n\n`);

  // Add client to the list
  const newClientStream = {
    clientEmail: clientEmail,
    res,
  };
  clientStreams.push(newClientStream);

  // Keep the connection alive with a heartbeat (optional but recommended)
  const keepAliveInterval = setInterval(() => {
    res.write(":\n\n"); // Comment to keep the connection alive
  }, 30000); // Every 30 seconds

  // Remove client on disconnect
  req.on("close", () => {
    console.log(`${clientEmail} Connection closed`);
    clearInterval(keepAliveInterval); // Stop the keep-alive interval
    clientStreams.splice(clientStreams.indexOf(newClientStream), 1);
  });
}

// Function to send notifications to all connected clients
function sendNotificationToClients(notification) {
  clientStreams.forEach((client) => {
    client.res.write(`data: ${JSON.stringify(notification)}\n\n`);
  });
}

说明:

保持回复开放:

通过不结束响应(res.end()),连接保持打开状态。 setInterval 函数每 30 秒发送一次心跳,以防止代理或浏览器关闭空闲连接。 管理连接的客户端:

维护已连接客户端(clientStreams)的列表以向他们发送新通知。 当客户端断开连接时,将其从列表中删除。 发送通知:

使用单独的函数 (sendNotificationToClients) 向所有连接的客户端发送新通知。 Nginx 配置:

更新您的 Nginx 配置以正确处理 SSE:

server {
    listen 80;
    server_name api.yourdomain.com;

    location /client_notifications {
        proxy_pass http://nodejs:3000;

        # SSE-specific settings
        proxy_set_header Host $host;
        proxy_set_header Connection '';
        proxy_http_version 1.1;
        chunked_transfer_encoding off;
        proxy_buffering off;
        proxy_cache off;

        # CORS headers (if needed)
        add_header Access-Control-Allow-Origin *;
        add_header Access-Control-Allow-Methods GET;
        add_header Access-Control-Allow-Headers DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,practemail,patientEmail;
    }
}

说明:

禁用缓冲和缓存:

proxy_buffering 关闭;确保数据立即发送到客户端。 proxy_cache 关闭;禁用 SSE 响应的缓存。 设置正确的标题:

proxy_set_header 连接'';确保连接保持打开状态。 proxy_http_版本 1.1;使用 HTTP/1.1 支持持久连接。 前端注意事项:

您的前端代码似乎大部分是正确的。确保您正确处理重新连接和错误。

useEffect(() => {
  const fetchData = async () => {
    await fetchEventSource("https://api.yourdomain.com/client_notifications", {
      method: "GET",
      headers: {
        Accept: "text/event-stream",
        patientemail: patientData.email,
      },
      onopen(res) {
        if (res.ok && res.status === 200) {
          console.log("Connection made ", res);
        } else if (res.status >= 400 && res.status < 500 && res.status !== 429) {
          console.log("Client side error ", res);
        }
      },
      onmessage(event) {
        const data = JSON.parse(event.data);
        // Handle incoming data
      },
      onclose() {
        console.log("Connection closed by the server");
        // Optionally implement reconnection logic
      },
      onerror(err) {
        console.log("There was an error from server", err);
        // Optionally implement reconnection logic
      },
    });
  };
  fetchData();
}, []);

其他提示:

确保 CORS 配置正确:

如果您的前端位于不同的域中,请确保服务器发送适当的 Access-Control-Allow-Origin 标头。 处理保持活动机制:

某些服务器或代理可能会关闭空闲连接。发送评论(: ) 定期保持连接处于活动状态。 检查 Nginx 日志中的错误:

在 Nginx 或服务器日志中查找可能表明连接关闭原因的任何错误。 确保没有 res.end() 或类似的调用:

仔细检查您的代码,确保没有任何代码可能会无意中关闭响应。 为什么这有效:

持久连接:

通过保持响应打开而不结束它,您可以维持持久连接,允许服务器在数据可用时将数据发送到客户端。 事件循环活动:

setInterval 和事件监听器确保 Node.js 保持事件循环处于活动状态,防止进程退出。 正确的代理处理:

配置 Nginx 禁用缓冲和缓存可确保 SSE 数据立即传递到客户端。 结论:

实施这些更改应保持 SSE 连接打开,以便您可以在新通知发生时将其推送到客户端。

参考资料:

MDN Web 文档 - 服务器发送的事件 NGINX - 为服务器发送的事件配置 NGINX Express.js SSE 示例 发送新通知的示例:

每当创建新通知时(例如,在您的应用程序逻辑中),您可以调用:

const newNotification = {
  message: "You have a new notification",
  timestamp: new Date(),
};

sendNotificationToClients(newNotification);

这会将通知发送给所有连接的客户端。

© www.soinside.com 2019 - 2024. All rights reserved.