我有一个 Dag,其计划间隔为 None。我想通过 TriggerDagRunOperator 在一天内多次触发这个 Dag。
我用schedule_interval“* 1/12 * * *”创建了一个PreDag
在 PreDag 内部,运行 TriggerDagRunOperator 的任务来触发主 Dag。
按计划 PreDag 每天运行两次,第一次当 PreDag 运行时触发 Dag,但第二次当 PreDag 运行时,triggerDagRunOperator 的任务显示错误:
“已存在 dag id {{ dag_id}} 在 {{execution_date}} 的 Dag 运行,运行 ID 为 {{trigger_run_id}}”`
trigger_run = TriggerDagRunOperator(
task_id="main_dag_trigger",
trigger_dag_id=str('DW_Test_TriggerDag'),
pool='branch_pool_limit',
wait_for_completion=True,
poke_interval=20,
trigger_run_id = 'trig__' + str(datetime.now()),
execution_date = '{{ ds }}',
# reset_dag_run = True ,
dag = predag
)`
是否可以使用 TriggerDagRunOperator 在一天内多次触发 dag。
Airflow 使用
execution_date
和 dag_id
作为 dag 运行表的 ID,因此当第二次触发 dag 时,会存在与第一次运行时创建的相同 execution_date
的运行。
为什么会出现这个问题?那是因为您在运行中使用
{{ ds }}
作为 execution_date
:
DAG 运行的逻辑日期为 YYYY-MM-DD。与 {{ dag_run.logic_date | 相同ds }}.
这是您跑步的日期,而不是日期时间,同一天触发的两次跑步的日期是相同的。
您可以通过将
{{ ds }}
替换为 {{ ts }}
来修复它
你可以做
tz = pytz.timezone('UTC')
unique_timestamp = datetime.now(tz)
然后进入
trigger_dbt = TriggerDagRunOperator(
...
execution_date=unique_timestamp,...
它可以正常工作,并且您可以保留所有依赖 dag 运行的历史记录。