我可以在 Airflow 中同时设置 DAG 级别和任务级别 on_failure_callbacks 吗?

问题描述 投票:0回答:1

我想设置任务级别和 DAG 级别的 on_failure_callbacks 并让它们执行两个不同的操作。如果配置后不相互覆盖,DAG 级别的失败回调和任务级别的回调是否都会触发?或者任务级别回调会覆盖 DAG 级别回调吗?

文档在这一点上有点令人困惑。

我的定义如下:

with DAG(
        default_args={"has different on_failure_callback"}
        dag_id = "test_dag",
        schedule_interval=None,
        start_date=datetime.datetime(2021, 1, 1),
        catchup=False,
        on_failure_callback=failure_callback_dag
    ) as dag:

failure_callback_dag 调用不同的函数来报告 DAG 运行失败。

callback airflow
1个回答
0
投票

是的,这是可能的。两个

on_failure_callback
功能彼此独立工作。例如:

from airflow import DAG
from airflow.operators.bash import BashOperator


def handle_dag_failure(context):
    print("TODO handle DAG failure")
    print(f"context = {context}")


def handle_task_failure(context):
    print("TODO handle task failure")
    print(f"context = {context}")


with DAG(dag_id="so_79020182", schedule=None, on_failure_callback=handle_dag_failure):
    BashOperator(task_id="test", bash_command="exit 1", on_failure_callback=handle_task_failure)

请注意,两者都采用单个参数(此处名为

context
)。

handle_task_failure
的输出显示在任务日志中。
handle_dag_failure
的输出有点隐蔽,你可以在Airflow的日志文件夹/scheduler/YYYY-MM-DD/yourdagfile.py.log中找到它,或者需要设置日志传送机制。

© www.soinside.com 2019 - 2024. All rights reserved.