我已经在本地主机中创建了一个气流服务器,但是当我从气流用户界面运行它时,dag 没有完成任务。它只是保持运行状态,然后 30 秒后调度程序心跳也停止。
但是相同的代码按照预期使用 cmd 中的气流任务测试运行得非常好
从用户界面运行时,我只在日志中得到它,然后它就挂起 2024-10-17T22:21:29.795+0530] {base.py:84} 信息 - 检索连接“api_posts”
在这行日志之后不再继续前进,永远运行,既不失败也不成功。
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
import json
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 10, 14),
}
dag = DAG(
'my_dag_three',
default_args=default_args,
schedule_interval='@daily',
)
task_get_posts = SimpleHttpOperator(
task_id="get_posts",
http_conn_id="api_posts",
endpoint="api/chat.postMessage",
method="POST",
headers={
"Authorization": "Bearer #token",
"Content-Type": "application/json"
},
data=json.dumps({
"channel": "#channel",
"text": "Hello from airflow"
}),
response_filter=lambda response: json.loads(response.text),
log_response=True,
dag=dag,
)
task_get_posts
以上是我的 dag 代码
我正在等待成功消息。我有正确的令牌和正确的频道 ID。这个 api 使用 postman 可以正常工作,但使用 dag 似乎不起作用
您是否尝试过修改 DAG 以包含超时和重试参数,类似这样?
task_get_posts = SimpleHttpOperator(
task_id="get_posts",
http_conn_id="api_posts",
endpoint="api/chat.postMessage",
method="POST",
headers={
"Authorization": "Bearer #token",
"Content-Type": "application/json"
},
data=json.dumps({
"channel": "#channel",
"text": "Hello from airflow"
}),
response_filter=lambda response: json.loads(response.text),
log_response=True,
execution_timeout=timedelta(seconds=60), # Add timeout
retries=3, # Add retries
retry_delay=timedelta(seconds=5), # Add retry delay
dag=dag,
)