我正在使用
websockets==13.1
并已阅读文档。 websockets
库提供了keep-alive函数来维持连接,但是keep-alive发送的ping
消息是一个空的ByteFrame。如何修改以下代码来发送像{"op": "ping"}
这样的自定义消息?
async for websocket in websockets.connect(
uri=self._base_url,
ping_interval=self._ping_interval,
ping_timeout=self._ping_timeout,
close_timeout=self._close_timeout,
max_queue=self._max_queue,
):
try:
payload = json.dumps(payload)
await websocket.send(payload)
async for msg in websocket:
msg = orjson.loads(msg)
print(msg)
except websockets.ConnectionClosed:
self._log.error(f"Connection closed, reconnecting...")
要发送像
{"op": "ping"}
这样的自定义 ping 消息而不是 websockets
库提供的默认空 ping 帧,您需要禁用内置的保持活动机制并实现您自己的机制。
禁用内置 Ping: 建立 WebSocket 连接时将
ping_interval
设置为 None
。这会禁用 websockets
库发送的自动 ping 消息。
实现自定义 Ping 功能: 创建一个异步函数,定期发送自定义 ping 消息。
同时运行自定义 Ping 功能: 使用
asyncio.create_task
与主接收循环同时运行自定义 ping 函数。
import asyncio
import json
import websockets
async def send_custom_ping(websocket, interval):
while True:
await websocket.send(json.dumps({"op": "ping"}))
await asyncio.sleep(interval)
async def main():
async with websockets.connect(
uri=self._base_url,
ping_interval=None, # Disable built-in ping
ping_timeout=self._ping_timeout,
close_timeout=self._close_timeout,
max_queue=self._max_queue,
) as websocket:
try:
ping_task = asyncio.create_task(send_custom_ping(websocket, self._ping_interval))
await websocket.send(json.dumps(payload))
async for msg in websocket:
msg = orjson.loads(msg)
print(msg)
except websockets.ConnectionClosed:
self._log.error(f"Connection closed, reconnecting...")
finally:
ping_task.cancel()
asyncio.run(main())