假设我有两个DAG,其中dag2使用TriggerDagRunOperator执行dag1作为其流程的一部分,如下所示:
现在让我们说dag2每天下午5点安排一次。有没有办法在我运行dag1时获取dag2(父DAG)的执行时间戳?是否有任何内置参数保存该值?
如果发生了某些事情并且dag2比平时更晚被触发,那么让我们说当天下午6点,那么我仍然希望得到原始的调度时间 - 也就是我在dag1时的下午5点。
将函数传递给python_callable
的TriggerDagRunOperator
参数,该参数将execution_date
注入触发的DAG:
def inject_execution_date(context, dag_run_obj):
dag_run_obj.payload = {"parent_execution_date": context["execution_date"]}
return dag_run_obj
[...]
trigger_dro = TriggerDagRunOperator(python_callable=inject_execution_date, [...])
您可以使用context["conf"]["parent_execution_date"]
在子DAG中访问它