我正在运行气流 dag,并尝试按计划运行它。当调度程序触发 dag 时,dag run_config 的计算结果为 None。
我尝试过这个,但它对我不起作用 - Airflow 如何为 dag_run.conf 设置默认值
我的代码看起来像这样
def parse_conf(**kwargs):
print(f"dag run config is {dag_run.conf}") # prints None if dag is run on scheduler
with DAG(
...
schedule_interval="* * * * *",
params={
"test_param":"test",
},) as dag
config_task = PythonOperator(task_id="parse_config", python_callable=parse_conf)
...
start_task.set_downstream(config_task)
如果手动触发 dag,则配置为所需。
在配置文件中,
dag_run_conf_overrides_params
设置为True
您可以在 dag 代码中使用 python configparser lib: 这是示例代码:
import os
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
import configparser
schedule_time = configparser.ConfigParser()
schedule_time.read('dags/settings.ini')
AutoSet = schedule_time.get('Run Scheduler', 'Timeset')
default_args = {
'owner': 'Dyan',
'start_date': dt.datetime(2023, 8, 29, tzinfo=local_tz),
'retry_delay': dt.timedelta(minutes=5),
'catch_up': False,
'provide_context': True
}
dag = DAG(
dag_id='api_call_scheduler',
default_args=default_args,
description='Api call Dag',
schedule_interval=AutoSet,
params={
"start_date": dt.datetime.strftime(dt.datetime.today().replace(tzinfo=pytz.utc).astimezone(local_tz) - dt.timedelta(1),'%Y-%m-%d'),
"end_date": dt.datetime.strftime(dt.datetime.today().replace(tzinfo=pytz.utc).astimezone(local_tz) - dt.timedelta(1),'%Y-%m-%d'),
"cmd": "ALL"
}
)