我们如何使用 TriggerDagRunOperator 触发单个气流 dag 倍增器次数?

问题描述 投票:0回答:2

我有一个 Dag,其计划间隔为 None。我想通过 TriggerDagRunOperator 在一天内多次触发这个 Dag。

我用schedule_interval“* 1/12 * * *”创建了一个PreDag
在 PreDag 内部,运行 TriggerDagRunOperator 的任务来触发主 Dag。
按计划 PreDag 每天运行两次,第一次当 PreDag 运行时触发 Dag,但第二次当 PreDag 运行时,triggerDagRunOperator 的任务显示错误: “已存在 dag id {{ dag_id}} 在 {{execution_date}} 的 Dag 运行,运行 ID 为 {{trigger_run_id}}”`

trigger_run = TriggerDagRunOperator(
                task_id="main_dag_trigger",
                trigger_dag_id=str('DW_Test_TriggerDag'),  
                pool='branch_pool_limit', 
                wait_for_completion=True, 
                poke_interval=20, 
                trigger_run_id = 'trig__' + str(datetime.now()),
                execution_date = '{{ ds }}',
                # reset_dag_run = True ,
                dag = predag
            )`

是否可以使用 TriggerDagRunOperator 在一天内多次触发 dag。

airflow
2个回答
0
投票

Airflow 使用

execution_date
dag_id
作为 dag 运行表的 ID,因此当第二次触发 dag 时,会存在与第一次运行时创建的相同
execution_date
的运行。

为什么会出现这个问题?那是因为您在运行中使用

{{ ds }}
作为
execution_date

DAG 运行的逻辑日期为 YYYY-MM-DD。与 {{ dag_run.logic_date | 相同ds }}.

这是您跑步的日期,而不是日期时间,同一天触发的两次跑步的日期是相同的。

您可以通过将

{{ ds }}
替换为
{{ ts }}

来修复它

0
投票

你可以做

tz = pytz.timezone('UTC')
unique_timestamp = datetime.now(tz)

然后进入

trigger_dbt = TriggerDagRunOperator(
...
   execution_date=unique_timestamp,...

它可以正常工作,并且您可以保留所有依赖 dag 运行的历史记录。

© www.soinside.com 2019 - 2024. All rights reserved.