如何通过airflow UI清除任务的那一刻让某些逻辑起作用?
或者我如何在运行任务时确定它是由于通过airflow UI清除而重新启动的?
我查看了触发器和可用的回调,有不同的,但不是我需要的。
目前我已经找到了只能通过分析日志或者使用Xcom的方式,但是这些解决方案都不是很方便。
根据 Airflow 文档,清除任务时会发生以下情况:
当前任务实例的
递增,try_number
设置为 0,状态设置为max_tries
,这会导致任务重新运行。None
因此,根据您运行逻辑的时间和方式,您可以利用 TaskInstance 的这些属性来确定您的任务是否已清除。我们还必须考虑为计算任务配置的任何“自然”重试。既然您提到使用回调函数,我将展示一个使用其中一个函数的示例,但获取 TaskInstance 对象可能需要一些额外的逻辑,具体取决于运行代码的上下文。如果您需要在任务运行时对其整体逻辑进行分支,那么最好使用 PythonOperator 或利用自定义 Python 代码的类似操作符。
from airflow.models.dag import DAG
from airflow.operators.dummy import DummyOperator
# Customize your DAG definition here.
dag = DAG()
def success_callback(ti):
try_number = ti.try_number
# Get the relative try number compared to the last clear
relative_try_number = try_number - ti.max_tries - ti.task.retries + 2
if try_number > 1 and relative_try_number == 1:
print('This task was just cleared.')
else:
print('This is a normal task run.')
dummy_task = DummyOperator(
task_id='dummy-task',
dag=dag,
on_success_callback=success_callback
)
dummy_task
我改编了这个答案中的代码来计算相对尝试次数。