有没有办法通过airflow UI查明任务已被清除?

问题描述 投票:0回答:1

如何通过airflow UI清除任务的那一刻让某些逻辑起作用?

或者我如何在运行任务时确定它是由于通过airflow UI清除而重新启动的?

我查看了触发器和可用的回调,有不同的,但不是我需要的。

目前我已经找到了只能通过分析日志或者使用Xcom的方式,但是这些解决方案都不是很方便。

python airflow
1个回答
0
投票

根据 Airflow 文档,清除任务时会发生以下情况:

当前任务实例的

try_number
递增,
max_tries
设置为 0,状态设置为
None
,这会导致任务重新运行。

因此,根据您运行逻辑的时间和方式,您可以利用 TaskInstance 的这些属性来确定您的任务是否已清除。我们还必须考虑为计算任务配置的任何“自然”重试。既然您提到使用回调函数,我将展示一个使用其中一个函数的示例,但获取 TaskInstance 对象可能需要一些额外的逻辑,具体取决于运行代码的上下文。如果您需要在任务运行时对其整体逻辑进行分支,那么最好使用 PythonOperator 或利用自定义 Python 代码的类似操作符。

from airflow.models.dag import DAG
from airflow.operators.dummy import DummyOperator

# Customize your DAG definition here.
dag = DAG()

def success_callback(ti):
    try_number = ti.try_number
    # Get the relative try number compared to the last clear
    relative_try_number = try_number - ti.max_tries - ti.task.retries + 2
    if try_number > 1 and relative_try_number == 1:
        print('This task was just cleared.')
    else:
        print('This is a normal task run.')

dummy_task = DummyOperator(
    task_id='dummy-task',
    dag=dag,
    on_success_callback=success_callback
)

dummy_task

我改编了这个答案中的代码来计算相对尝试次数。

© www.soinside.com 2019 - 2024. All rights reserved.