使用 dag 配置参数安排气流

问题描述 投票:0回答:1

我正在运行气流 dag,并尝试按计划运行它。当调度程序触发 dag 时,dag run_config 的计算结果为 None。

我尝试过这个,但它对我不起作用 - Airflow 如何为 dag_run.conf 设置默认值

我的代码看起来像这样

def parse_conf(**kwargs):
  print(f"dag run config is {dag_run.conf}") # prints None if dag is run on scheduler

with DAG(
...
schedule_interval="* * * * *",
params={
        "test_param":"test",
},) as dag

  config_task = PythonOperator(task_id="parse_config", python_callable=parse_conf)
  ...
  start_task.set_downstream(config_task)

如果手动触发 dag,则配置为所需。

在配置文件中,

dag_run_conf_overrides_params
设置为True

python airflow scheduling
1个回答
0
投票

您可以在 dag 代码中使用 python configparser lib: 这是示例代码:

import os
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
import configparser

schedule_time = configparser.ConfigParser()
schedule_time.read('dags/settings.ini')
AutoSet = schedule_time.get('Run Scheduler', 'Timeset')

default_args = {
    'owner': 'Dyan',
    'start_date': dt.datetime(2023, 8, 29, tzinfo=local_tz),
    'retry_delay': dt.timedelta(minutes=5),
    'catch_up': False,
    'provide_context': True
}

dag = DAG(
    dag_id='api_call_scheduler',
    default_args=default_args,
    description='Api call Dag',
    schedule_interval=AutoSet,
    params={
        "start_date": dt.datetime.strftime(dt.datetime.today().replace(tzinfo=pytz.utc).astimezone(local_tz) - dt.timedelta(1),'%Y-%m-%d'),
        "end_date": dt.datetime.strftime(dt.datetime.today().replace(tzinfo=pytz.utc).astimezone(local_tz) - dt.timedelta(1),'%Y-%m-%d'),
        "cmd": "ALL"
    }   
)
© www.soinside.com 2019 - 2024. All rights reserved.