我有一个使用
websockets
的应用程序,我在 python dict
中管理连接。这是我管理我的 websockets
的代码——它在 Docker 容器中运行:
ws_router = APIRouter()
class ConnectionManager:
def __init__(self):
self.active_sockets = {}
self.disconnected_sockets = {}
async def connect(self, websocket: WebSocket):
client_ip = websocket.client.host
self.active_sockets[websocket] = {
"client_ip": client_ip
}
await websocket.send_json({
"action": "send_id",
"socket_id": socket_id
})
self.print_conn()
def print_conn(self):
print(f"active: {self.active_sockets}")
print(f"disco: {self.disconnected_sockets}")
def disconnect(self, websocket: WebSocket):
if websocket in self.active_sockets:
data = self.active_sockets[websocket]
self.disconnected_sockets[data["socket_id"]] = data
del self.active_sockets[websocket]
self.print_conn()
def clean_disconnections(self):
self.print_conn()
manager = ConnectionManager()
@ws_router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
ping_task = asyncio.create_task(manager.enviar_ping(websocket))
while True:
data = await websocket.receive_json()
await handle_data(data, websocket)
我需要使用 celery 每小时清理一次连接,所以我配置了 celerybeat:
celery = Celery("celery")
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND")
celery.conf.timezone = 'America/Mexico_City'
celery.conf.enable_utc = False
celery.conf.beat_schedule = {
'clean_connections': {
"task": "clean_connections",
"schedule": crontab(minute=24),
"options": {'queue': 'cleaner'}
}
}
#@celery.task(name=)
@shared_task(name='clean_connections')
def clean_connections():
manager.clean_disconnections()
当我运行我的项目时,我与套接字连接,我的应用程序日志显示连接,但是当 celerybeat 运行时,celery 日志仅显示一个空对象。我如何分享这段记忆?
我尝试更改为任务和共享任务,并使用全局变量,但它仍然打印相同的空对象
这就是分布式架构的问题:由于您的 Web 应用程序和工作线程不在同一个进程中运行(或者甚至不一定在同一台机器上),所以您不能。我在这里可以看到两个可行的解决方案:要么在您的应用程序中实现一个关闭连接的 API 端点,要么使用在 FastAPI 进程中运行的东西(类似于
fastapi-utils中的
repeat_every
函数)。