在 Flask 应用程序后台运行异步 Websocket 循环

问题描述 投票:0回答:1

我正在尝试构建一个应用程序,它将连接到 websocket(用于流数据)并通过 REST API 提供此数据。我正在使用

Flask
创建 API,并使用
websockets
包连接到 websocket 服务器。

这是我到目前为止的代码:

import asyncio
import websockets
from flask import Flask, jsonify

app = Flask(__name__)
websocket = None

async def websocket_setup():
    global websocket
    uri = 'wss://ws.example.com'
    websocket = await websockets.connect(uri)    
    print("WebSocket connected")

async def websocket_loop():
    global websocket
    await websocket_setup()
    while True:
        data = await websocket.recv()
        print("Received:", data) # will save to a global variable

@app.get('/')
def home():
    return jsonify({'msg':'hello'})
# Will have APIs for get data, send msg to websocket, etc

if __name__ == '__main__':
    asyncio.create_task(websocket_loop())
    app.run(debug=False)

错误:

RuntimeError: no running event loop
sys:1: RuntimeWarning: coroutine 'websocket_loop' was never awaited

我将

websocket
保留为全局变量,因为我需要在 API 方法中使用它,例如发送消息。我也可以使用任何其他框架,我尝试过
FastAPI
Sanic
,但没有成功。

python flask asynchronous websocket python-asyncio
1个回答
0
投票

谢谢你的提问。

下面是我的代码草稿版本,它从 websocket 获取信息并将其传递给 Flaskapp。

这里要注意什么:

  1. websocket 作业应该在单独的线程中运行。如果您的代码会变得更大,还可以考虑使用
    taskiq
    作为任务代理。
  2. 要在主作业和任务作业之间进行通信,您可以使用队列
  3. 您应该注意在调试模式下重新加载应用程序期间创建的 websocket 进程的数量。基本上我的代码在生产版本中可以正常工作,但如果您使用应用程序重新加载,可能会开始复制连接到“websocket”的线程。
import asyncio
import websockets
from flask import Flask, jsonify
from queue import Queue, Empty
from threading import Thread
import os
q = Queue()

app = Flask(__name__)

ws_uri = "ws://localhost:8000/chat/ws_proto"

async def websocket_loop(websocket):
    while True:
        data = await websocket.recv()
        print("ws data:", data) 
        # will save to a global queue
        q.put(data)

def ws_thread_task():
    async def ws_run(ws_uri=ws_uri): 
        # always run websocket in the context to properly close it on errors
        async with websockets.connect(ws_uri) as websocket:
            print("WebSocket connected")
            await websocket_loop(websocket)    

    asyncio.run(ws_run())


@app.route('/')
def home():
    print('connected')
    while True:
        try:
            item = q.get_nowait()
            print("recieved message", item)
        except Empty:
            print('q is empty')
            break        

    return jsonify({'msg':'data collection done'})


if __name__ == '__main__':
    
    # this options intended to prevent from creating duplicated threads on app reload
    if not (app.debug or os.environ.get('FLASK_ENV') == 'development') or os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
        # start websocket job in separate thread
        thread = Thread(target=ws_thread_task, daemon=True )
        thread.start()

    
    app.run(debug=True, port=3333, use_reloader=True)
© www.soinside.com 2019 - 2024. All rights reserved.