Celery send_task() 方法

问题描述 投票:0回答:2

我有我的API,一些端点需要将请求转发到Celery。想法是拥有特定的 API 服务,基本上只实例化 Celery 客户端并使用 send_task() 方法,以及使用任务的单独服务(workers)。任务定义的代码应位于该辅助服务中。基本上将 celery 应用程序(API)和 celery Worker 分离为两个单独的服务。 我不希望我的 API 了解任何 celery 任务定义,端点只需要使用

celery_client.send_task('some_task', (some_arguments))
。因此,在一项服务上我有我的 API,在其他服务/主机上我有 celery 代码库,我的 celery 工作人员将在其中执行任务。

我发现了这篇很棒的文章,它描述了我想做的事情。 https://medium.com/@tanchinhiong/separating-celery-application-and-worker-in-docker-containers-f70fedb1ba6d 和这篇文章 Celery - 如何从远程计算机发送任务?

我需要有关如何通过 API 创建任务路线的帮助?我期待

celery_client.send_task()
queue=
关键字,但事实并非如此。我需要有 2 个队列和两个工作人员来消耗这两个队列中的内容。

对我的工人的命令:

celery -A <path_to_my_celery_file>.celery_client worker --loglevel=info -Q queue_1
celery -A <path_to_my_celery_file>.celery_client worker --loglevel=info -Q queue_2

我还访问了 celery“路由任务”文档,但我仍然不清楚如何建立这种通信。

python redis rabbitmq celery
2个回答
3
投票

您的 API 端应容纳路由器。我想这不是问题,因为它只是任务 -> 队列的映射(又名将

task1
发送到
queue1
)。

换句话说,你的

celery_client
应该有task_routes,例如:

task_routes = {
    'mytasks.some_task': 'queue_1',
    'mytasks.some_other_task': 'queue_2',
}

0
投票

Celery 确实有一个

queue
参数用于
send_task()
,因为它采用与
apply_async()

相同的 kwargs

https://docs.celeryq.dev/en/stable/reference/celery.html#celery.Celery.send_task https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.apply_async

celery_client.send_task(
    'some_task',
    args=(some_arguments,),
    queue='your_queue'
)
© www.soinside.com 2019 - 2024. All rights reserved.