我有我的API,一些端点需要将请求转发到Celery。想法是拥有特定的 API 服务,基本上只实例化 Celery 客户端并使用 send_task() 方法,以及使用任务的单独服务(workers)。任务定义的代码应位于该辅助服务中。基本上将 celery 应用程序(API)和 celery Worker 分离为两个单独的服务。 我不希望我的 API 了解任何 celery 任务定义,端点只需要使用
celery_client.send_task('some_task', (some_arguments))
。因此,在一项服务上我有我的 API,在其他服务/主机上我有 celery 代码库,我的 celery 工作人员将在其中执行任务。
我发现了这篇很棒的文章,它描述了我想做的事情。 https://medium.com/@tanchinhiong/separating-celery-application-and-worker-in-docker-containers-f70fedb1ba6d 和这篇文章 Celery - 如何从远程计算机发送任务?
我需要有关如何通过 API 创建任务路线的帮助?我期待
celery_client.send_task()
有 queue=
关键字,但事实并非如此。我需要有 2 个队列和两个工作人员来消耗这两个队列中的内容。
对我的工人的命令:
celery -A <path_to_my_celery_file>.celery_client worker --loglevel=info -Q queue_1
celery -A <path_to_my_celery_file>.celery_client worker --loglevel=info -Q queue_2
我还访问了 celery“路由任务”文档,但我仍然不清楚如何建立这种通信。
您的 API 端应容纳路由器。我想这不是问题,因为它只是任务 -> 队列的映射(又名将
task1
发送到 queue1
)。
换句话说,你的
celery_client
应该有task_routes,例如:
task_routes = {
'mytasks.some_task': 'queue_1',
'mytasks.some_other_task': 'queue_2',
}
Celery 确实有一个
queue
参数用于 send_task()
,因为它采用与 apply_async()
相同的 kwargs
https://docs.celeryq.dev/en/stable/reference/celery.html#celery.Celery.send_task https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.apply_async
celery_client.send_task(
'some_task',
args=(some_arguments,),
queue='your_queue'
)