我有 3 个客户端,它们定期向我的服务器发送数据。我使用 FastAPI 文档.
中的示例这是我的服务器代码:
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_response(self, message: dict, websocket: WebSocket):
await websocket.send_json(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/chargeStationState/{client_id}")
async def websocket_endpoint(websocket: WebSocket,
client_id: int,
db: Session = Depends(deps.get_db)):
await manager.connect(websocket)
try:
while True:
message = await websocket.receive_json()
logging.info(message)
## read data from db
response = {
"stations": "repsonse",
"timestamp": int(time.time())
}
await manager.send_response(response, websocket)
#await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
这是客户端代码:
async for websocket in websockets.connect("ws://127.0.0.1:8001/chargeStationState/1"):
message = {'name':'station1'}
await websocket.send(json.dumps(message))
p = await asyncio.wait_for(websocket.recv(), timeout=10)
print(p)
await asyncio.sleep(2)
所以,我想要 5 个客户端,他们将与我的服务器通信并发送传感器数据,但 5 分钟后我收到以下错误
websockets.exceptions.ConnectionClosedOK: received 1000 (OK)
然后收到
1000 (OK)
,无法确定问题所在。
测试您提供的代码,似乎无法重现您所指的问题。因此,问题可能出在代码的其他地方。下面根据您的问题中提供的代码提供了一个工作示例。相关示例可以在here以及here和here中找到。
app.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import uvicorn
import time
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_json(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_json(message)
app = FastAPI()
manager = ConnectionManager()
@app.websocket('/chargeStationState')
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_json()
response = {'station': data['station'], 'timestamp': time.ctime()}
await manager.send_personal_message(response, websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
test.py
import websockets
import asyncio
import json
async def main():
url = 'ws://127.0.0.1:8000/chargeStationState'
data = json.dumps({'station':'1'})
async for websocket in websockets.connect(url):
await websocket.send(data)
print(await asyncio.wait_for(websocket.recv(), timeout=10))
await asyncio.sleep(2)
asyncio.run(main())