我正在尝试构建一个应用程序,它将连接到 websocket(用于流数据)并通过 REST API 提供此数据。我正在使用
Flask
创建 API,并使用 websockets
包连接到 websocket 服务器。
这是我到目前为止的代码:
import asyncio
import websockets
from flask import Flask, jsonify
app = Flask(__name__)
websocket = None
async def websocket_setup():
global websocket
uri = 'wss://ws.example.com'
websocket = await websockets.connect(uri)
print("WebSocket connected")
async def websocket_loop():
global websocket
await websocket_setup()
while True:
data = await websocket.recv()
print("Received:", data) # will save to a global variable
@app.get('/')
def home():
return jsonify({'msg':'hello'})
# Will have APIs for get data, send msg to websocket, etc
if __name__ == '__main__':
asyncio.create_task(websocket_loop())
app.run(debug=False)
错误:
RuntimeError: no running event loop
sys:1: RuntimeWarning: coroutine 'websocket_loop' was never awaited
我将
websocket
保留为全局变量,因为我需要在 API 方法中使用它,例如发送消息。我也可以使用任何其他框架,我尝试过 FastAPI
和 Sanic
,但没有成功。
谢谢你的提问。
下面是我的代码草稿版本,它从 websocket 获取信息并将其传递给 Flaskapp。
这里要注意什么:
taskiq
作为任务代理。import asyncio
import websockets
from flask import Flask, jsonify
from queue import Queue, Empty
from threading import Thread
import os
q = Queue()
app = Flask(__name__)
ws_uri = "ws://localhost:8000/chat/ws_proto"
async def websocket_loop(websocket):
while True:
data = await websocket.recv()
print("ws data:", data)
# will save to a global queue
q.put(data)
def ws_thread_task():
async def ws_run(ws_uri=ws_uri):
# always run websocket in the context to properly close it on errors
async with websockets.connect(ws_uri) as websocket:
print("WebSocket connected")
await websocket_loop(websocket)
asyncio.run(ws_run())
@app.route('/')
def home():
print('connected')
while True:
try:
item = q.get_nowait()
print("recieved message", item)
except Empty:
print('q is empty')
break
return jsonify({'msg':'data collection done'})
if __name__ == '__main__':
# this options intended to prevent from creating duplicated threads on app reload
if not (app.debug or os.environ.get('FLASK_ENV') == 'development') or os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
# start websocket job in separate thread
thread = Thread(target=ws_thread_task, daemon=True )
thread.start()
app.run(debug=True, port=3333, use_reloader=True)