Airflow dag,如果在某个时间触发则等待一定时间

问题描述 投票:0回答:1

我有一个气流 DAG dag-A,它是从另一个 DAG 触发的,有时,这个 dag-A 在 UTC 下午 4 点(美国东部标准时间午夜)触发,当它在美国东部标准时间午夜(世界标准时间下午 4 点)触发时,我希望它等待 30 分钟,然后在 UTC 时间 16:30 开始运行。

一般来说,它应该在触发时运行,但如果在 16:00 到 16:30 UTC 之间触发,则应该等到 16:30。

我已经使用 sleep 方法完成了此操作,但尝试使用 TimeSensor 执行相同的操作,因为当 sleep 阻塞时,这将是非阻塞的。

这是我的代码,它导致我进入无限循环,因为即使任务是

check_time
返回 false,它也会继续等待 16:30。我希望如果它不在 16:00 和 16:30 之间,则运行 dq_taks,但如果它在 16:00 和 16:30 之间,则等到它变成 16:30,然后运行 dq_task。

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.time import TimeSensor
from airflow.utils.dates import days_ago
from datetime import datetime

def check_time_and_delay():
    now = datetime.now()
    return now.hour == 16 and now.minute >= 0 and now.minute <= 30

with DAG(
    'delayed_dag',
    default_args={'start_date': days_ago(1)},
    schedule_interval=None,
) as dag:

    check_time = PythonOperator(
        task_id='check_time',
        python_callable=check_time_and_delay
    )

    wait_until_12_30 = TimeSensor(
        task_id='wait_until_12_30',
        target_time=atetime.strptime('16:30:00', '%H:%M:%S').time(),
        mode='reschedule',
        timeout=1800 
    )

    run_dq_check = PythonOperator(
        task_id='run_dq_check',
        python_callable=lambda: print("Running DQ check")
    )

    # Define the task flow
    check_time >> [wait_until_12_30, run_dq_check]
    wait_until_12_30 >> run_dq_check

有效但使用 sleep 方法的代码:

def check_if_midnight_and_wait(execution_date):
    if execution_date.hour == 4 and execution_date.minute < 30:   # 4PM utc is midnight in EST
        wait_time = timedelta(minutes=30 - execution_date.minute)
        print(f"Waiting for {wait_time} minutes to start at 00:30 AM EST.")
        sleep(wait_time.total_seconds())
# task:
  wait_and_start = PythonOperator(
        task_id='wait_and_start',
        python_callable=check_if_midnight_and_wait,
        provide_context=True
    )

 wait_and_start >> dq_check
python airflow
1个回答
0
投票

尝试使用

BranchPythonOperator

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.sensors.time_sensor import TimeSensor
from airflow.utils.dates import days_ago
from datetime import datetime, time

def check_time_and_branch():
    now = datetime.utcnow()
    if now.hour == 16 and 0 <= now.minute < 30:
        return 'wait_until_16_30'
    return 'run_dq_check'

with DAG(
    'delayed_dag',
    default_args={'start_date': days_ago(1)},
    schedule_interval=None,
) as dag:

    check_time = BranchPythonOperator(
        task_id='check_time',
        python_callable=check_time_and_branch
    )

    wait_until_16_30 = TimeSensor(
        task_id='wait_until_16_30',
        target_time=time(16, 30),
        mode='reschedule',
        timeout=1800 
    )

    run_dq_check = PythonOperator(
        task_id='run_dq_check',
        python_callable=lambda: print("Running DQ check")
    )

    check_time >> [wait_until_16_30, run_dq_check]
    wait_until_16_30 >> run_dq_check
© www.soinside.com 2019 - 2024. All rights reserved.