我如何与 celerybeat 共享在我的 FastAPI 应用程序中创建的类的实例?

问题描述 投票:0回答:1

我有一个使用

websockets
的应用程序,我在 python
dict
中管理连接。这是我管理我的
websockets
的代码——它在 Docker 容器中运行:

ws_router = APIRouter()

class ConnectionManager:

    def __init__(self):
        self.active_sockets = {}
        self.disconnected_sockets = {}

    async def connect(self, websocket: WebSocket):
        client_ip = websocket.client.host

        self.active_sockets[websocket] = {
            "client_ip": client_ip
        }

        await websocket.send_json({
            "action": "send_id",
            "socket_id": socket_id
        })
        self.print_conn()

    def print_conn(self):
        print(f"active: {self.active_sockets}")
        print(f"disco: {self.disconnected_sockets}")

    def disconnect(self, websocket: WebSocket):

        if websocket in self.active_sockets:
            data = self.active_sockets[websocket]

            self.disconnected_sockets[data["socket_id"]] = data

            del self.active_sockets[websocket]

        self.print_conn()

    def clean_disconnections(self):
        self.print_conn()


manager = ConnectionManager()


@ws_router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    ping_task = asyncio.create_task(manager.enviar_ping(websocket))
    while True:
       data = await websocket.receive_json()
       await handle_data(data, websocket)

我需要使用 celery 每小时清理一次连接,所以我配置了 celerybeat:

celery = Celery("celery")

celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND")
celery.conf.timezone = 'America/Mexico_City'
celery.conf.enable_utc = False
celery.conf.beat_schedule = {
    'clean_connections': {
        "task": "clean_connections",
        "schedule": crontab(minute=24),
        "options": {'queue': 'cleaner'}
    }
}

#@celery.task(name=)

@shared_task(name='clean_connections')
def clean_connections():
    manager.clean_disconnections()

当我运行我的项目时,我与套接字连接,我的应用程序日志显示连接,但是当 celerybeat 运行时,celery 日志仅显示一个空对象。我如何分享这段记忆?

我尝试更改为任务和共享任务,并使用全局变量,但它仍然打印相同的空对象

python websocket celery fastapi celerybeat
1个回答
0
投票

这就是分布式架构的问题:由于您的 Web 应用程序和工作线程不在同一个进程中运行(或者甚至不一定在同一台机器上),所以您不能。我在这里可以看到两个可行的解决方案:要么在您的应用程序中实现一个关闭连接的 API 端点,要么使用在 FastAPI 进程中运行的东西(类似于

fastapi-utils
中的 repeat_every 函数)。

© www.soinside.com 2019 - 2024. All rights reserved.