尝试从 Airflow DAG 失败中触发 AWS SNS 主题,以调用为失败的 DAG 创建 JIRA 票证的 Lambda 函数

问题描述 投票:0回答:1

我创建了一个测试 dag,在运行测试时失败,但在 dag 失败后我无法触发 SNS 主题。

是否缺少或添加了某些内容,导致其无法工作并触发 SNS 主题?这是我输入的 Python 代码,它应该会触发有关 DAG 失败的 SNS 主题:

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from pathlib import Path
from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
import boto3
from airflow.providers.amazon.aws.hooks.sns import SnsHook

DAG_NAME = 'dag_name'

# Funciton which triggers SNS notification on failure
def failure_callback(context):
    sns_notification = SnsPublishOperator(
        sns_hook = SnsHook(sns_topic_arn='My Topic ARN'),
        dag_id = context["dag"].dag_id,
        task_id = context["task_instance"].task_id,
        exception = str(context.get("exception")),
        log_url = context["task_instance"].log_url
    )

# Trigger SNS notification
    sns_client = boto3.client('sns', region_name='eu-west-2')
    sns_client.publish(
        TopicArn='My Topic ARN',
        Message=f"DAG {context['dag'].dag_id} failed on task {context['task_instance'].task_id}"
    )

# Function to intentionally fail DAG (ONLY FOR TESTING)
def fail_dag():
    raise Exception("Intentional DAG failure for test")

# Define DAG
default_args = {
    "on_failure_callback": failure_callback,
    'owner': 'Me',
    'start_date': datetime(2021, 12, 9),
    'is_prod': False
}

with DAG(
    dag_id='dag_name',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
) as dag:
    
    # To fail task intentionally
    fail_task = PythonOperator(
        task_id='fail_task',
        python_callable=fail_dag,
    )

    # Dummy start and end tasks for DAG
    start_task = DummyOperator(task_id='start')
    end_task = DummyOperator(task_id='end')

    # Dependencies (chained)
    start_task >> fail_task >> end_task

globals()[DAG_NAME] = dag

如果我在 Python 代码之外缺少任何内容,请提出建议,我会研究它们。谁能帮我解决这个问题,我就会得到奖励,因为这项任务是我现在所有压力的根源。

python amazon-web-services aws-lambda airflow amazon-sns
1个回答
0
投票

Apache Airflow 提供回调以启用其他基于事件的功能。这是官方文档

以下是在特定任务或操作员失败时发送电子邮件的示例:

from airflow.utils.email import send_email

def failure_alert(context):
    subject = f"Task Failed: {context['task_instance_key_str']}"
    body = "This task has failed. Please check the logs for more information."
    send_email('[email protected]', subject, body)

... your dag code

    task = DummyOperator(
        task_id='failable_task',
        on_failure_callback=failure_alert
    )
© www.soinside.com 2019 - 2024. All rights reserved.