使用气流任务测试 [dag_id] [task_id] 任务可以正常运行,但同样的事情在气流 UI 中不起作用

问题描述 投票:0回答:1

我已经在本地主机中创建了一个气流服务器,但是当我从气流用户界面运行它时,dag 没有完成任务。它只是保持运行状态,然后 30 秒后调度程序心跳也停止。

但是相同的代码按照预期使用 cmd 中的气流任务测试运行得非常好

从用户界面运行时,我只在日志中得到它,然后它就挂起 2024-10-17T22:21:29.795+0530] {base.py:84} 信息 - 检索连接“api_posts”

在这行日志之后不再继续前进,永远运行,既不失败也不成功。

from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
import json

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 10, 14),
}

dag = DAG(
    'my_dag_three',
    default_args=default_args,
    schedule_interval='@daily',
)

task_get_posts = SimpleHttpOperator(
    task_id="get_posts",
    http_conn_id="api_posts",
    endpoint="api/chat.postMessage",
    method="POST",
    headers={
        "Authorization": "Bearer #token",
        "Content-Type": "application/json"
    },
    data=json.dumps({
        "channel": "#channel",
        "text": "Hello from airflow"
    }),
    response_filter=lambda response: json.loads(response.text),
    log_response=True,
    dag=dag,
)

task_get_posts

以上是我的 dag 代码

我正在等待成功消息。我有正确的令牌和正确的频道 ID。这个 api 使用 postman 可以正常工作,但使用 dag 似乎不起作用

airflow slack-api airflow-2.x airflow-webserver
1个回答
0
投票

您是否尝试过修改 DAG 以包含超时和重试参数,类似这样?

     task_get_posts = SimpleHttpOperator(
     task_id="get_posts",
     http_conn_id="api_posts",
     endpoint="api/chat.postMessage",
     method="POST",
     headers={
         "Authorization": "Bearer #token",
         "Content-Type": "application/json"
     },
     data=json.dumps({
         "channel": "#channel",
         "text": "Hello from airflow"
     }),
     response_filter=lambda response: json.loads(response.text),
     log_response=True,
     execution_timeout=timedelta(seconds=60),  # Add timeout
     retries=3,  # Add retries
     retry_delay=timedelta(seconds=5),  # Add retry delay
     dag=dag,
 )
 
© www.soinside.com 2019 - 2024. All rights reserved.