我正在制作使用 binance websocket api 获取信息的应用程序。我们已经有了主要代码库,但决定在独立应用程序中实现 websockets。 看起来像这样:
import asyncio
from threading import Thread
import uvicorn
from fastapi import FastAPI
from binance import AsyncClient, BinanceSocketManager
app = FastAPI()
@app.get("/")
async def root():
return "Websocket app"
async def main_socket_stream():
client = await AsyncClient.create()
binance_manager = BinanceSocketManager(client=client)
multiplex_socket = binance_manager.futures_multiplex_socket(symbols)
async with multiplex_socket as active_socket:
while True:
result = await active_socket.recv()
print(result)
# There are lots of further logic - proceed data, save something to db, etc.
def side_thread():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.run(main_socket_stream())
if __name__ == "__main__":
thread = Thread(target=side_thread, args=(), daemon=True)
thread.start()
uvicorn.run(app, port=5105)
我删除了一些不必要的部分,但主要思想是清晰的。 所以,我在后台有 websocket - 它可以工作。但我希望能够从主应用程序(基于 Django)控制它们。
我想到了类似的事情:
async with multiplex_socket as active_socket:
while True:
if await EventListenerClass.new_event_fired():
break # or do something special
result = await active_socket.recv()
print(result)
我考虑使用 Redis Pub-Sub 制作一些“EventListener”类并监听通道中的新消息。会有多种类型的消息,例如:“stop_websocket”、“reload_with_updated_symbols_list”等
我需要一个解决方案:
我对 Redis 的想法正确吗?
一般工作流程应包括 Redis 或类似的 Sub-Pub 系统,用于协调消息传递。如果都是同一个应用程序,您将使用互斥锁从共享空间读取数据,这本质上与 Redis 等 Sub-Pub 执行相同的功能。
我熟悉 Swagger(一种 Python REST API 模式/框架)中的相同 Redis 解决方案。 Redis 成为检查状态的一种方法。请注意,尽管小型系统或虚拟机中的 Redis 通常能够在更新后几毫秒内做出响应,但如果您需要低延迟高带宽请求,则可能需要解决性能问题。
您还可以设计自己的实时子酒吧系统,使用网络阻塞来解决并发问题,但是,redis 是一种广泛采用且可用的解决方案,存在于各种平台上。
Qt 使用 Websockets 通过从 C++ 到 Javascript 的绑定来处理其 Cef 客户端集成。这是从主机应用程序向 CEF 窗口发送消息的另一种方式。