如何使用 Microsoft 365 Worfklow 向 Microsoft Teams 发送 Airflow 失败通知

问题描述 投票:0回答:1

我有一个Airflow功能,可以在Teams组中发送失败的通知任务。随着新的微软政治的出现,我的旧 webhook 调用将在几个月内不再起作用,所以我想现在就调整代码。这是我当前的代码:

from airflow.models import Variable
import logging
import requests

def failed_task_notify_teams(context):
    logging.info("Send notification to the Teams Group")
    payload = {
        "@type": "MessageCard",
        "@context": "http://schema.org/extensions",
        "title": "Airflow Task Error",
        "summary": f"Task {context['task_instance_key_str']} : Failed",
        "themeColor": "F44336",
        "sections": [
            {
                "activityTitle": f"Task {context['task_instance_key_str']} : Failed",
                "activitySubtitle": f"DAG: {context['dag'].dag_id}",
                "facts": [
                    {
                        "name": "Date",
                        "value": context['ds']
                    },
                    {
                        "name": "Log URL",
                        "value": context['task_instance'].log_url
                    }
                ]
            }
        ],
        "potentialAction": [{
            "@type": "OpenUri",
            "name": "See Logs",
            "targets": [{
                "os": "default",
                "uri": context['task_instance'].log_url
            }]
        }]
    }
    
    headers = {"content-type": "application/json"}
    requests.post(Variable.get('teams_webhook_secret'), json=payload, headers=headers)
    logging.info("Teams notification sent to the group!")

在我看来,代码应该看起来非常相同,但该代码现在无法与 Microsoft 365 工作流一起使用。

python-3.x notifications airflow workflow microsoft-teams
1个回答
0
投票

经过进一步研究,我找到了解决方案。代码看起来实际上非常相似,我只需要更改有效负载“参数”。有修改后的代码:

from airflow.models import Variable
import logging
import requests

def failed_task_notify_teams(context):
    logging.info("Send notification to the Teams Group")
    payload = {
        "type": "message",
        "attachments" : [{
            "contentType": "application/vdn.microsoft.card.adaptive",
            "contentUrl": None,
            "content": {
                "$schema": "http://adaptivecard.io/schemas/adaptive-card.json",
                "type": "AdaptiveCard",
                "version": "1.2",
                "body": [
                    {
                        "type": "TextBlock",
                        "text": f"Task {context['task_instance_key_str']} : Failed",
                        "weight": "bolder",
                        "size": "medium",
                        "color": "attention",
                    },
                    {
                        "type": "TextBlock",
                        "text": f"DAG : {context['dag'].dag_id}",
                        "spacing": "none"
                    },
                    {
                        "type": "FactSet",
                        "facts": [
                            {
                                "title": "Date :",
                                "value": context['ds']
                            },
                            {
                                "title": "Log URL :",
                                "value": f"[Click here]({context['task_instance'].log_url})"
                            }
                        ]
                    }
                ],
                "actions": [
                    {
                        "type": "Action.OpenUrl",
                        "title": "See logs",
                        "url": context['task_instance'].log_url
                    }
                ]
            }
        }]
    }
    
    headers = {"content-type": "application/json"}
    response = requests.post(Variable.get('teams_azure_webook_secret'), json=payload, headers=headers)

    if response.status_code == 200:
        logging.info("Notification sent to Teams Group")
    else:
        logging.error(f"Failed to sent notification to Teams Group: {response.status_code} - {response.text}")
© www.soinside.com 2019 - 2024. All rights reserved.