我需要在 Python 中同时实现 SSE 服务器和客户端,以便能够在客户端订阅后随时向客户端发送事件。 具体来说,我在服务器中有一个用 Flask 制作的 API,客户端应该能够通过向该 API 发送 GET 来订阅事件通道。
我已经在网上找到了 SSE 客户端在 Python 中的实现,这应该不是问题,但我找不到服务器端实现。我只能找到服务器的 Javascript 实现。
要在 Python 中实现 SSE 服务器和客户端,您可以使用 Flask-SSE 包,它是 Flask 扩展,可以轻松创建 SSE 端点。
首先,您可以使用 pip 安装 Flask-SSE:
pip install flask-sse
接下来可以创建Flask应用并初始化SSE扩展:
from flask import Flask
from flask_sse import sse
app = Flask(__name__)
app.config["REDIS_URL"] = "redis://localhost"
app.register_blueprint(sse, url_prefix='/stream')
在这个例子中,我们使用 Redis 作为 SSE 服务器的后端。
接下来,您可以定义一个函数来生成SSE事件并发送给客户端:
import time
from flask_sse import sse
@app.route('/events')
def stream():
def event_stream():
while True:
yield 'data: {}\n\n'.format("hello")
time.sleep(1)
return Response(event_stream(), mimetype="text/event-stream")
在这个例子中,我们定义了一个路由 /events ,它每秒生成 SSE 事件,消息为“hello”。
最后,您可以创建一个客户端来消费 SSE 事件:
import sseclient
client = sseclient.SSEClient('http://localhost:5000/events')
for event in client.events():
print(event.data)
本客户端使用sseclient库连接到SSE服务器,并将收到的消息打印到控制台。
请注意,这只是一个基本示例,您可以根据需要自定义 SSE 服务器和客户端。