如何获取airflow中dag运行的最新执行时间

问题描述 投票:0回答:3

我尝试了下面的代码,但仍然遇到问题

from airflow.models DagModel

def get_latest_execution_date(**kwargs):

session = airflow.settings.Session()

f = open("/home/Insurance/InsuranceDagsTimestamp.txt","w+")

try:
    Insurance_last_dag_run = session.query(DagModel)
    for Insdgrun in Insurance_last_dag_run:
        if Insdgrun is None: 
            f.write(Insdgrun.dag_id+",9999-12-31"+"\n")
        else:
            f.write(Insdgrun.dag_id+","+ Insdgrun.execution_date+"\n")
except:
    session.rollback()
finally:
    session.close()

t1 = PythonOperator(
    task_id='records',
    provide_context=True,
    python_callable=get_latest_execution_date,
    dag=dag)

有什么方法可以修复并获取最新的 dag 运行时信息

airflow
3个回答
14
投票

有多种方法可以获取 DagRun 的最新执行情况。一种方法是利用 Airflow DagRun 模型。

from airflow.models import DagRun

def get_most_recent_dag_run(dag_id):
    dag_runs = DagRun.find(dag_id=dag_id)
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
    return dag_runs[0] if dag_runs else None


dag_run = get_most_recent_dag_run('fake-dag-id-001')
if dag_run:
    print(f'The most recent DagRun was executed at: {dag_run.execution_date}')

您可以在此处Airflow 文档中找到有关 DagRun 模型及其属性的更多信息。


1
投票

PythonOperator

 
op_args
 参数已模板化。

可调用函数仅将最新执行日期写入文件,因此您可以通过以下方式实现该功能:

def store_last_execution_date(execution_date): '''Appends latest execution date to a file :param execution_date: The last execution date of the DagRun. ''' with open("/home/Insurance/InsuranceDagsTimestamp.txt", "w+") as f: f.write(execution_date) t1 = PythonOperator( task_id="records", provide_context=True, python_callable=store_last_execution_date, op_args=[ "{{dag.get_latest_execution_date()}}", ], dag=dag )
    

0
投票
如果您正在寻找日期本身,就像我到达这里时一样,您可以使用这个(只是对乔希的答案的一个小补充,对此有所帮助):

from airflow.models import DagRun from datetime import datetime def get_last_dag_run_date(dag_id): dag_runs = DagRun.find(dag_id=dag_id) dag_runs.sort(key=lambda x: x.execution_date, reverse=True) date = dag_runs[0].execution_date formatted_date = date.strftime('%m%d%Y') if formatted_date: return formatted_date else: raise ValueError(f"No successful runs found for DAG: {dag_id}")
在这种情况下它会返回 

mmddYYYY


    

© www.soinside.com 2019 - 2024. All rights reserved.