我想设置任务级别和 DAG 级别的 on_failure_callbacks 并让它们执行两个不同的操作。如果配置后不相互覆盖,DAG 级别的失败回调和任务级别的回调是否都会触发?或者任务级别回调会覆盖 DAG 级别回调吗?
文档在这一点上有点令人困惑。
我的定义如下:
with DAG(
default_args={"has different on_failure_callback"}
dag_id = "test_dag",
schedule_interval=None,
start_date=datetime.datetime(2021, 1, 1),
catchup=False,
on_failure_callback=failure_callback_dag
) as dag:
failure_callback_dag 调用不同的函数来报告 DAG 运行失败。
是的,这是可能的。两个
on_failure_callback
功能彼此独立工作。例如:
from airflow import DAG
from airflow.operators.bash import BashOperator
def handle_dag_failure(context):
print("TODO handle DAG failure")
print(f"context = {context}")
def handle_task_failure(context):
print("TODO handle task failure")
print(f"context = {context}")
with DAG(dag_id="so_79020182", schedule=None, on_failure_callback=handle_dag_failure):
BashOperator(task_id="test", bash_command="exit 1", on_failure_callback=handle_task_failure)
请注意,两者都采用单个参数(此处名为
context
)。
handle_task_failure
的输出显示在任务日志中。 handle_dag_failure
的输出有点隐蔽,你可以在Airflow的日志文件夹/scheduler/YYYY-MM-DD/yourdagfile.py.log中找到它,或者需要设置日志传送机制。