我得到了带有 on_failure_callback 函数的 Airflow dag。 但是 on_failure_callback 函数在失败时不会运行,并且我没有看到日志。
有我的代码。
此代码中的所有@tasks 都有效,无需显示特殊的失败函数。 我通过向函数添加 **kwargs 来处理任务实例(另一个选项在我的情况下不起作用)。
为什么 send_message_on_dag_fail 不起作用,然后 dag 失败了?
我将相同的代码添加到任务 get_start_end_dates 只是为了确保 send_message_on_dag_fail 函数正常工作。
Airflow 2.8.2,Pyhton 3.11.8。
import pandas as pd
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pytz
import httpx
from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException
from airflow.models import Variable
def get_current_period(date: datetime.date = None):
tz = pytz.timezone('Europe/Moscow')
if date:
now = date.date()
else:
now = datetime.now(tz).date()
if now.day <= 15:
start_date = (now - relativedelta(months=1)).replace(day=16)
end_date = now.replace(day=1) - relativedelta(days=1)
else:
start_date = now.replace(day=1)
end_date = now.replace(day=15)
return str(start_date), str(end_date)
def send_msg(bot_token: str, chat_id: str, message: str, type:str = 'message' or 'code'):
if type == 'message':
url = f'https://api.telegram.org/bot{bot_token}/sendMessage?chat_id={chat_id}&text={message}'
client = httpx.Client(base_url='https://')
return client.post(url)
elif type == 'code':
url = f'https://api.telegram.org/bot{bot_token}/sendMessage'
params = {
'chat_id': chat_id,
'text': message,
'parse_mode': 'Markdown'
}
client = httpx.Client(base_url='https://')
return client.post(url, params=params)
def get_xcom_from_context(context, task_id: str, dict_key:str = False):
if dict_key:
xcom = context['ti'].xcom_pull(task_ids=task_id)[dict_key]
else:
xcom = context['ti'].xcom_pull(task_ids=task_id)
return xcom
default_args = {
'owner': 'user',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'start_date': datetime(2024, 9, 19)
}
host = Variable.get('host')
database_name = Variable.get('database_name')
user_name = Variable.get('user_name')
password_for_db = Variable.get('password_for_db')
server_host_name = Variable.get('server_host_name')
bearer_key = Variable.get('bearer_key')
user_key = Variable.get('user_key')
sales_plans_url = Variable.get('sales_plans_url')
specialization_prices_url = Variable.get('specialization_prices_url')
bot_token = Variable.get('bot_token')
chat_id = Variable.get('chat_id')
def send_message_on_dag_fail(bot_token = bot_token, chat_id = chat_id, **kwargs):
context = kwargs
log = context['ti'].log
log.error('DAG FINISHED WITH ERROR __________________') # this error text easier to find
task_id = context['ti'].task_id
dag_id = context['dag'].dag_id
message = f"Task {task_id} from Dag {dag_id} failed."
log.error(message)
send_msg(bot_token, chat_id, message, 'message')
@dag(default_args=default_args, schedule_interval=None, catchup=False, concurrency=4, on_failure_callback=send_message_on_dag_fail)
def dag_get_bonus_and_penaltys_for_staff():
@task
def check_time():
tz = pytz.timezone('Europe/Moscow')
current_time = datetime.now(tz).time()
if current_time >= datetime.strptime("00:00", "%H:%M").time() and current_time <= datetime.strptime("01:00", "%H:%M").time():
return False
else:
return True
@task
def get_start_end_dates(bot_token = bot_token, chat_id = chat_id,**kwargs):
context = kwargs
log = context['ti'].log
check_time = get_xcom_from_context(context, 'check_time')
log.info('Xcom objects pulled from context')
if check_time:
start_date, end_date = get_current_period()
result = {
'start_date': start_date
, 'end_date': end_date
}
task_id = context['ti'].task_id
dag_id = context['dag'].dag_id
message = f"TASK {task_id} DAG {dag_id}."
log.error(message)
send_msg(bot_token, chat_id, message, 'message')
return result
else:
raise AirflowSkipException("Time for database cleaning, skip DAG execution.")
...
check_time_task = check_time()
get_start_end_dates_task = get_start_end_dates()
check_time_task >> get_start_end_dates_task >> ...
dag_get_bonus_and_penaltys_for_staff = dag_get_bonus_and_penaltys_for_staff()
我尝试重写 send_message_on_dag_fail ,就像仅使用上下文作为输入一样,现在它可以工作了。
我不知道为什么,因为然后我尝试在具有上下文的任务中使用上下文(不是 **kwargs),但我失败了。
有什么想法为什么会这样吗?
def send_message_on_dag_fail(context):
log = context['ti'].log
log.error('DAG FINISHED WITH ERROR __________________') # this error text easier to find
task_id = context['ti'].task_id
dag_id = context['dag'].dag_id
message = f"Task {task_id} from Dag {dag_id} failed."
log.error(message)
send_msg(bot_token, chat_id, message, 'message')