我在 Airflow 中有以下 DAG:
validate_and_prepare_config >> skip_detect_task >> [ingest, detect]
detect >> export
ingest >> skip_decrypt_task >> [decrypt, parse]
decrypt >> parse >> vault_transfer >> skip_export_task >> [export, get_pipeline_state]
export >> get_pipeline_state >> post_process
期望的行为是从
vault-transfer
分支到任一
1.
export
,然后get_pipeline_state
或
2.直接到
get_pipeline_state
skip_export
任务包含一个分支运算符,它返回“export
”或“get_pipeline_state
”。当它返回“get_pipeline_state
”时,它会按预期分支到 get_pipeline_state
任务。然而,当它返回“export
”时,DAG仍然跳过export
任务,仍然分支到get_pipeline_state
。
这是我在
skip_export
任务的日志中看到的内容:
{python.py:177} INFO - Done. Returned value was: export
{python.py:211} INFO - Branch callable return export
{skipmixin.py:155} INFO - Following branch export
{skipmixin.py:211} INFO - Skipping tasks []
这些是我在
skip_export
任务中看到的 XCOM 值:
键 | 价值 |
---|---|
返回值 | 出口 |
跳过mixin_key | {'已关注': ['get_pipeline_state', '导出']} |
为什么会发生这种情况以及如何让
skip_export
任务按预期运行?
这是因为默认的触发规则设置为all_success,这意味着只有当其所有前置任务都成功时才会执行该任务。
您的案例如下:
export 是 detect 和 skip_export 的下游。 export任务将显示为已跳过,因为其trigger_rule默认设置为all_success,并且由分支操作引起的跳过(在detect上)向下级联以跳过标记为all_success的任务。
在导出任务上添加
trigger_rule=none_failed_min_one_success
以使其正常工作。
老实说,我不确定你的
get_pipeline_state
任务 - 理论上它应该有同样的问题,但从图片来看似乎工作正常。为了确定起见,您可以仔细检查有或没有调整触发规则的情况。