我尝试了下面的代码,但仍然遇到问题
from airflow.models DagModel
def get_latest_execution_date(**kwargs):
session = airflow.settings.Session()
f = open("/home/Insurance/InsuranceDagsTimestamp.txt","w+")
try:
Insurance_last_dag_run = session.query(DagModel)
for Insdgrun in Insurance_last_dag_run:
if Insdgrun is None:
f.write(Insdgrun.dag_id+",9999-12-31"+"\n")
else:
f.write(Insdgrun.dag_id+","+ Insdgrun.execution_date+"\n")
except:
session.rollback()
finally:
session.close()
t1 = PythonOperator(
task_id='records',
provide_context=True,
python_callable=get_latest_execution_date,
dag=dag)
有什么方法可以修复并获取最新的 dag 运行时信息
有多种方法可以获取 DagRun 的最新执行情况。一种方法是利用 Airflow DagRun 模型。
from airflow.models import DagRun
def get_most_recent_dag_run(dag_id):
dag_runs = DagRun.find(dag_id=dag_id)
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
return dag_runs[0] if dag_runs else None
dag_run = get_most_recent_dag_run('fake-dag-id-001')
if dag_run:
print(f'The most recent DagRun was executed at: {dag_run.execution_date}')
PythonOperator
op_args
参数已模板化。可调用函数仅将最新执行日期写入文件,因此您可以通过以下方式实现该功能:
def store_last_execution_date(execution_date):
'''Appends latest execution date to a file
:param execution_date: The last execution date of the DagRun.
'''
with open("/home/Insurance/InsuranceDagsTimestamp.txt", "w+") as f:
f.write(execution_date)
t1 = PythonOperator(
task_id="records",
provide_context=True,
python_callable=store_last_execution_date,
op_args=[
"{{dag.get_latest_execution_date()}}",
],
dag=dag
)
from airflow.models import DagRun
from datetime import datetime
def get_last_dag_run_date(dag_id):
dag_runs = DagRun.find(dag_id=dag_id)
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
date = dag_runs[0].execution_date
formatted_date = date.strftime('%m%d%Y')
if formatted_date:
return formatted_date
else:
raise ValueError(f"No successful runs found for DAG: {dag_id}")
在这种情况下它会返回 mmddYYYY