如何用Python实现SSE(Server Sent Events)服务端和客户端?

问题描述 投票:0回答:1

我需要在 Python 中同时实现 SSE 服务器和客户端,以便能够在客户端订阅后随时向客户端发送事件。 具体来说,我在服务器中有一个用 Flask 制作的 API,客户端应该能够通过向该 API 发送 GET 来订阅事件通道。

我已经在网上找到了 SSE 客户端在 Python 中的实现,这应该不是问题,但我找不到服务器端实现。我只能找到服务器的 Javascript 实现。

python server client server-sent-events web-of-things
1个回答
0
投票

要在 Python 中实现 SSE 服务器和客户端,您可以使用 Flask-SSE 包,它是 Flask 扩展,可以轻松创建 SSE 端点。

首先,您可以使用 pip 安装 Flask-SSE:

pip install flask-sse

接下来可以创建Flask应用并初始化SSE扩展:

from flask import Flask
from flask_sse import sse

app = Flask(__name__)
app.config["REDIS_URL"] = "redis://localhost"
app.register_blueprint(sse, url_prefix='/stream')

在这个例子中,我们使用 Redis 作为 SSE 服务器的后端。

接下来,您可以定义一个函数来生成SSE事件并发送给客户端:

import time
from flask_sse import sse

@app.route('/events')
def stream():
    def event_stream():
        while True:
            yield 'data: {}\n\n'.format("hello")
            time.sleep(1)

    return Response(event_stream(), mimetype="text/event-stream")

在这个例子中,我们定义了一个路由 /events ,它每秒生成 SSE 事件,消息为“hello”。

最后,您可以创建一个客户端来消费 SSE 事件:

import sseclient

client = sseclient.SSEClient('http://localhost:5000/events')
for event in client.events():
    print(event.data)

本客户端使用sseclient库连接到SSE服务器,并将收到的消息打印到控制台。

请注意,这只是一个基本示例,您可以根据需要自定义 SSE 服务器和客户端。

© www.soinside.com 2019 - 2024. All rights reserved.