我创建了一个测试 dag,在运行测试时失败,但在 dag 失败后我无法触发 SNS 主题。
是否缺少或添加了某些内容,导致其无法工作并触发 SNS 主题?这是我输入的 Python 代码,它应该会触发有关 DAG 失败的 SNS 主题:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from pathlib import Path
from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
import boto3
from airflow.providers.amazon.aws.hooks.sns import SnsHook
DAG_NAME = 'dag_name'
# Funciton which triggers SNS notification on failure
def failure_callback(context):
sns_notification = SnsPublishOperator(
sns_hook = SnsHook(sns_topic_arn='My Topic ARN'),
dag_id = context["dag"].dag_id,
task_id = context["task_instance"].task_id,
exception = str(context.get("exception")),
log_url = context["task_instance"].log_url
)
# Trigger SNS notification
sns_client = boto3.client('sns', region_name='eu-west-2')
sns_client.publish(
TopicArn='My Topic ARN',
Message=f"DAG {context['dag'].dag_id} failed on task {context['task_instance'].task_id}"
)
# Function to intentionally fail DAG (ONLY FOR TESTING)
def fail_dag():
raise Exception("Intentional DAG failure for test")
# Define DAG
default_args = {
"on_failure_callback": failure_callback,
'owner': 'Me',
'start_date': datetime(2021, 12, 9),
'is_prod': False
}
with DAG(
dag_id='dag_name',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
# To fail task intentionally
fail_task = PythonOperator(
task_id='fail_task',
python_callable=fail_dag,
)
# Dummy start and end tasks for DAG
start_task = DummyOperator(task_id='start')
end_task = DummyOperator(task_id='end')
# Dependencies (chained)
start_task >> fail_task >> end_task
globals()[DAG_NAME] = dag
如果我在 Python 代码之外缺少任何内容,请提出建议,我会研究它们。谁能帮我解决这个问题,我就会得到奖励,因为这项任务是我现在所有压力的根源。
Apache Airflow 提供回调以启用其他基于事件的功能。这是官方文档。
以下是在特定任务或操作员失败时发送电子邮件的示例:
from airflow.utils.email import send_email
def failure_alert(context):
subject = f"Task Failed: {context['task_instance_key_str']}"
body = "This task has failed. Please check the logs for more information."
send_email('[email protected]', subject, body)
... your dag code
task = DummyOperator(
task_id='failable_task',
on_failure_callback=failure_alert
)