是否可以根据运行时条件动态地将队列分配给 Airflow 中的任务?例如,我想根据前一个任务提供的 XCom 值来确定任务的队列?
我正在设计一个由多个 Celery 工作人员组成的系统,我想将特定任务发送给特定工作人员。如果可以的话,如何实现呢?否则,我还能如何解决这个问题?
您可以在 celery 中使用手动路由选项
from kombu import Queue
app.conf.task_queues = (
Queue('default', routing_key='default.#'),
Queue('high_priority', routing_key='high_priority.#'),
Queue('low_priority', routing_key='low_priority.#'),
)
# Send to high_priority queue
my_task.apply_async(args=['urgent_task'], queue='high_priority')
# Send to low_priority queue
my_task.apply_async(args=['non_urgent_task'], queue='low_priority')
celery -A my_app worker -Q high_priority -n worker_high_priority@%h
查看手动路由文档