我注意到,对于计划任务,执行日期是根据
设置为过去的Airflow 是作为 ETL 需求的解决方案而开发的。在 ETL 世界中, 您通常会汇总数据。所以,如果我想总结数据 2016-02-19,我会在 2016-02-20 GMT 午夜进行,即 2016 年 2 月 19 日的所有数据均可用后。
但是,当一个 dag 触发另一个 dag 时,执行时间将设置为 now()。
有没有办法让触发的DAG与触发DAG的执行时间相同?当然,我可以重写模板并使用昨天_ds,但是,这是一个棘手的解决方案。
以下类对
TriggerDagRunOperator
进行了扩展,以允许将执行日期作为字符串传递,然后将其转换回日期时间。这有点老套,但这是我发现完成工作的唯一方法。
from datetime import datetime
import logging
from airflow import settings
from airflow.utils.state import State
from airflow.models import DagBag
from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder
class MMTTriggerDagRunOperator(TriggerDagRunOperator):
"""
MMT-patched for passing explicit execution date
(otherwise it's hard to hook the datetime.now() date).
Use when you want to explicity set the execution date on the target DAG
from the controller DAG.
Adapted from Paul Elliot's solution on airflow-dev mailing list archives:
http://mail-archives.apache.org/mod_mbox/airflow-dev/201711.mbox/%3cCAJuWvXgLfipPmMhkbf63puPGfi_ezj8vHYWoSHpBXysXhF_oZQ@mail.gmail.com%3e
Parameters
------------------
execution_date: str
the custom execution date (jinja'd)
Usage Example:
-------------------
my_dag_trigger_operator = MMTTriggerDagRunOperator(
execution_date="{{execution_date}}"
task_id='my_dag_trigger_operator',
trigger_dag_id='my_target_dag_id',
python_callable=lambda: random.getrandbits(1),
params={},
dag=my_controller_dag
)
"""
template_fields = ('execution_date',)
def __init__(
self, trigger_dag_id, python_callable, execution_date,
*args, **kwargs
):
self.execution_date = execution_date
super(MMTTriggerDagRunOperator, self).__init__(
trigger_dag_id=trigger_dag_id, python_callable=python_callable,
*args, **kwargs
)
def execute(self, context):
run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
dro = self.python_callable(context, dro)
if dro:
session = settings.Session()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
execution_date=self.execution_date,
conf=dro.payload,
external_trigger=True)
logging.info("Creating DagRun {}".format(dr))
session.add(dr)
session.commit()
session.close()
else:
logging.info("Criteria not met, moving on")
使用此功能而不设置
execution_date=now()
时可能会遇到一个问题:如果您尝试使用相同的 execution_date
启动 dag 两次,您的操作员将抛出 mysql 错误。这是因为 execution_date
和 dag_id
用于创建行索引,无法插入具有相同索引的行。
无论如何,我想不出你想要在生产中运行两个具有相同
execution_date
的相同 dags 的原因,但这是我在测试时遇到的事情,你不应该对此感到惊慌。只需清除旧作业或使用不同的日期时间即可。
TriggerDagRunOperator
现在有一个execution_date
参数来设置触发运行的执行日期。
不幸的是,该参数不在模板字段中。
如果将其添加到模板字段(或者如果您覆盖运算符并更改 template_fields 值),则可以像这样使用它:
my_trigger_task= TriggerDagRunOperator(task_id='my_trigger_task',
trigger_dag_id="triggered_dag_id",
python_callable=conditionally_trigger,
execution_date= '{{execution_date}}',
dag=dag)
尚未发布,但您可以在此处查看来源: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/dagrun_operator.py
我对 MMTTriggerDagRunOperator 进行了一些改进。该函数检查dag_run是否已经存在,如果找到,则使用airflow的clear函数重新启动dag。这使我们能够在 dag 之间创建依赖关系,因为将执行日期移动到触发的 dag 的可能性开启了整个宇宙的惊人可能性。我想知道为什么这不是气流中的默认行为。
def execute(self, context):
run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
dro = self.python_callable(context, dro)
if dro:
session = settings.Session()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
if not trigger_dag.get_dagrun( self.execution_date ):
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
execution_date=self.execution_date,
conf=dro.payload,
external_trigger=True
)
logging.info("Creating DagRun {}".format(dr))
session.add(dr)
session.commit()
else:
trigger_dag.clear(
start_date = self.execution_date,
end_date = self.execution_date,
only_failed = False,
only_running = False,
confirm_prompt = False,
reset_dag_runs = True,
include_subdags= False,
dry_run = False
)
logging.info("Cleared DagRun {}".format(trigger_dag))
session.close()
else:
logging.info("Criteria not met, moving on")
airflow 的实验性 API 部分提供了一个函数,允许您触发具有特定执行日期的 dag。
https://github.com/apache/incubator-airflow/blob/master/airflow/api/common/experimental/trigger_dag.py
您可以将此函数作为PythonOperator的一部分来调用并达到目的。
所以它看起来像
from airflow.api.common.experimental.trigger_dag import trigger_dag
trigger_operator=PythonOperator(task_id='YOUR_TASK_ID',
python_callable=trigger_dag,
op_args=['dag_id'],
op_kwargs={'execution_date': datetime.now()})
在 Airflow 2 中,您只需将逻辑日期参数添加到 TriggerDagRunOperator 即可。如果您需要的话,我还包含了 DAG 参数
param1
:
from datetime import datetime
from airflow.decorators import dag
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
@dag(
dag_id="my_dag",
schedule_interval="0 0 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
)
def dag_definition():
trigger_run_task = TriggerDagRunOperator(
task_id="trigger_other_dag",
trigger_dag_id="my_other_dag",
logical_date="{{logical_date}}",
conf={"param1": "my_param_value"},
)
dag = dag_definition()