如何在特定时间运行任务?

问题描述 投票:0回答:1

我正在 Airflow 中创建一个 DAG,在这个 DAG 中,我需要使用 TimeSensor 触发另一个 DAG。目标是将目标时间设置为凌晨 2:00 至凌晨 3:00 之间,如果时间传感器在凌晨 2:00 之后触发,则应等到第二天的凌晨 2:00。但是,我遇到以下错误:

TypeErrors not supported between instances of 'datetime time' and 'datetime datetime"

我的代码;

  from airflow.models import DAG
  from datetime import timedelta, datetime
  from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor 
  from airflow.operators.python_operator import PythonOperator 
  from airflow.providers.ssh.operators.ssh import SSHOperator 
  from airflow.operators.trigger_dagrun import TriggerDagRunOperator 
  from airflow.sensors.time_sensor import TimeSensor 
  from datetime import datetime, time 
  from airflow.utils.state import State 
  from airflow.models import DagRun 
  import pytz 
  import pendulum

  def calculate_next_target_time():
     now = pendulum.now('Europe/Istanbul')
     target_time = now.replace(hour=2, minute=0, second=0, microsecond=0)
     if now >= target_time:
        target_time = target_time.add (days=1)
     return target_time
  def wait_for_time():
    next_target_time = calculate_next_target_time()
    print(f"Next target time: (next_target_time}")
    return next_target_time

  with DAG(
    '_dag',
    default_args={
    'depends_on_past' : False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,},
     schedule_interval= None,
    start_date=datetime (2021, 1, 1), catchup=False) as dag:
    dummy_task= SSHOperator (
       task_id= "dummy_task"
       command =  "dummy-task"
       ssh_conn_id = "dummy",)

    wait_for_2am = TimeSensor (
       task_id= 'wait_for_2am',                      poke_interval=60, 
       timeout=3000, 
       mode= 'poke',
       target_time=calculate_next_target_time(),
dag=dag)

    wait_for_2am_task = PythonOperator (
      task_id= 'wait_for_2am_task'
      python_callable=wait_for_time,
dag=dag)

   service_trigger = TriggerDagRunOperator (
     task_id=“service_trigger”, 
     trigger_dag_id=“other_dag”,)

dummy_task » wait_for_2am_task »    wait_for_2am » service_trigger

我该如何解决这个问题。

python airflow
1个回答
0
投票

哦,你这个可怜的家伙,你还缺乏智慧来提出如此简单的问题吗?如果你的能力不那么被懒惰所蒙蔽,你可能会像正午的太阳一样清楚地辨别出答案。我想古人自己也会对如此缺乏智慧感到惊讶!祈祷吧,唤起你内心所剩无几的理智,不要让空气中充满连婴儿也能解决的问题。

© www.soinside.com 2019 - 2024. All rights reserved.