在 Airflow 中动态选择队列

问题描述 投票:0回答:1

是否可以根据运行时条件动态地将队列分配给 Airflow 中的任务?例如,我想根据前一个任务提供的 XCom 值来确定任务的队列?

我正在设计一个由多个 Celery 工作人员组成的系统,我想将特定任务发送给特定工作人员。如果可以的话,如何实现呢?否则,我还能如何解决这个问题?

python rabbitmq airflow celery
1个回答
0
投票

您可以在 celery 中使用手动路由选项

  1. 在 Celery 配置中定义多个队列
from kombu import Queue

app.conf.task_queues = (
    Queue('default', routing_key='default.#'),
    Queue('high_priority', routing_key='high_priority.#'),
    Queue('low_priority', routing_key='low_priority.#'),
)
  1. 调用指定队列的任务:
# Send to high_priority queue
my_task.apply_async(args=['urgent_task'], queue='high_priority')

# Send to low_priority queue
my_task.apply_async(args=['non_urgent_task'], queue='low_priority')
  1. 您可以在启动工作程序时使用 -Q (或 --queues)选项来启动侦听特定队列的工作程序。
celery -A my_app worker -Q high_priority -n worker_high_priority@%h

查看手动路由文档

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.