我有一个气流 DAG dag-A,它是从另一个 DAG 触发的,有时,这个 dag-A 在 UTC 下午 4 点(美国东部标准时间午夜)触发,当它在美国东部标准时间午夜(世界标准时间下午 4 点)触发时,我希望它等待 30 分钟,然后在 UTC 时间 16:30 开始运行。
一般来说,它应该在触发时运行,但如果在 16:00 到 16:30 UTC 之间触发,则应该等到 16:30。
我已经使用 sleep 方法完成了此操作,但尝试使用 TimeSensor 执行相同的操作,因为当 sleep 阻塞时,这将是非阻塞的。
这是我的代码,它导致我进入无限循环,因为即使任务是
check_time
返回 false,它也会继续等待 16:30。我希望如果它不在 16:00 和 16:30 之间,则运行 dq_taks,但如果它在 16:00 和 16:30 之间,则等到它变成 16:30,然后运行 dq_task。
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.time import TimeSensor
from airflow.utils.dates import days_ago
from datetime import datetime
def check_time_and_delay():
now = datetime.now()
return now.hour == 16 and now.minute >= 0 and now.minute <= 30
with DAG(
'delayed_dag',
default_args={'start_date': days_ago(1)},
schedule_interval=None,
) as dag:
check_time = PythonOperator(
task_id='check_time',
python_callable=check_time_and_delay
)
wait_until_12_30 = TimeSensor(
task_id='wait_until_12_30',
target_time=atetime.strptime('16:30:00', '%H:%M:%S').time(),
mode='reschedule',
timeout=1800
)
run_dq_check = PythonOperator(
task_id='run_dq_check',
python_callable=lambda: print("Running DQ check")
)
# Define the task flow
check_time >> [wait_until_12_30, run_dq_check]
wait_until_12_30 >> run_dq_check
有效但使用 sleep 方法的代码:
def check_if_midnight_and_wait(execution_date):
if execution_date.hour == 4 and execution_date.minute < 30: # 4PM utc is midnight in EST
wait_time = timedelta(minutes=30 - execution_date.minute)
print(f"Waiting for {wait_time} minutes to start at 00:30 AM EST.")
sleep(wait_time.total_seconds())
# task:
wait_and_start = PythonOperator(
task_id='wait_and_start',
python_callable=check_if_midnight_and_wait,
provide_context=True
)
wait_and_start >> dq_check
尝试使用
BranchPythonOperator
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.sensors.time_sensor import TimeSensor
from airflow.utils.dates import days_ago
from datetime import datetime, time
def check_time_and_branch():
now = datetime.utcnow()
if now.hour == 16 and 0 <= now.minute < 30:
return 'wait_until_16_30'
return 'run_dq_check'
with DAG(
'delayed_dag',
default_args={'start_date': days_ago(1)},
schedule_interval=None,
) as dag:
check_time = BranchPythonOperator(
task_id='check_time',
python_callable=check_time_and_branch
)
wait_until_16_30 = TimeSensor(
task_id='wait_until_16_30',
target_time=time(16, 30),
mode='reschedule',
timeout=1800
)
run_dq_check = PythonOperator(
task_id='run_dq_check',
python_callable=lambda: print("Running DQ check")
)
check_time >> [wait_until_16_30, run_dq_check]
wait_until_16_30 >> run_dq_check