尝试从 Streamlit 应用程序触发电源自动化(Http 触发)流程时出现 401

问题描述 投票:0回答:1

我已尝试使用以下代码来触发电源自动流程:

def get_access_token_powerAutomate():
    tenant_id = os.getenv("TENANT_ID")
    client_id = os.getenv("CLIENT_ID")
    client_secret = os.getenv("CLIENT_SECRET")
    scope = 'https://graph.microsoft.com/.default'
   
    url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
   
    headers = {
        'Content-Type': 'application/x-www-form-urlencoded'
    }
   
    body = {
        'grant_type': 'client_credentials',
        'client_id': client_id,
        'client_secret': client_secret,
        'scope': scope
    }
   
    response = requests.post(url, headers=headers, data=body)
   
    if response.status_code == 200:
        access_token = response.json().get('access_token')
        if not access_token:
            st.error("Access token not found in the response.")
            return None
        return access_token
    else:
        st.error("Failed to obtain access token.")
        st.error(f"Response status: {response.status_code}")
        st.error(f"Response content: {response.content}")
        return None
 
def power_automateflow(organizer_upn, attendee_upns, meeting_notes, selected_subject, transcript_date):
    access_token = get_access_token_powerAutomate()
    st.write("access_token",access_token)
    if not access_token:
        st.error("Access token retrieval failed.")
        return
   
    mail_subject = f" Notes of {selected_subject} on {transcript_date}"
    # Data to be sent
    transcript_data = {
        'organizer_upn': organizer_upn,
        'attendee_upns': attendee_upns,
        'meeting_notes': meeting_notes,
        'mail_subject': mail_subject
    }
 
    # Set the headers with the access token
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Content-Type': 'application/json'
    }
 
    # Power Automate flow endpoint URL
    power_automate_url = os.getenv("POWER_AUTOMATE_URL")
    if not power_automate_url:
        st.error("Power Automate URL is not set.")
        return
 
    # Making the POST request to Power Automate flow
    response = requests.post(power_automate_url, headers=headers, json=transcript_data)
 
    if response.status_code == 200:
        st.success("Meeting Notes sent successfully for organizer's approval!")
        st.success("Check your mail once they approve.")
    else:
        st.error(f"Failed to send Meeting Notes. Status code: {response.status_code}")
        st.error(f"Response content: {response.content}")

我在注册的应用程序中添加了以下权限:

enter image description here

出现以下错误:

发送会议记录失败。状态代码:401 响应内容:b'{"error":{"code":"SecurityTokenInvalidSignature","message":"提供的身份验证令牌无效,令牌签名格式不正确。"}}',

任何帮助将不胜感激。

我已经尝试了上面的代码,并想解决提及的问题。

python power-automate streamlit unauthorized azure-http-trigger
1个回答
0
投票

要从streamlit应用程序触发电源自动化(Http触发)流程,您需要授予电源自动化API的API权限

创建一个 Microsoft Entra ID 应用程序并添加 API 权限,如下所示:

enter image description here

如果访问令牌不包含调用 API 所需的 API 权限,通常会发生

401
错误。

要解决该错误,scope 传递为

https://service.flow.microsoft.com/.default

修改代码如下:

def get_access_token_powerAutomate():
    tenant_id = os.getenv("TENANT_ID")
    client_id = os.getenv("CLIENT_ID")
    client_secret = os.getenv("CLIENT_SECRET")
    scope = 'https://service.flow.microsoft.com/.default'
   
    url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
   
    headers = {
        'Content-Type': 'application/x-www-form-urlencoded'
    }
   
    body = {
        'grant_type': 'client_credentials',
        'client_id': client_id,
        'client_secret': client_secret,
        'scope': scope
    }
   
    response = requests.post(url, headers=headers, data=body)
   
    if response.status_code == 200:
        access_token = response.json().get('access_token')
        if not access_token:
            st.error("Access token not found in the response.")
            return None
        return access_token
    else:
        st.error("Failed to obtain access token.")
        st.error(f"Response status: {response.status_code}")
        st.error(f"Response content: {response.content}")
        return None
 
def power_automateflow(organizer_upn, attendee_upns, meeting_notes, selected_subject, transcript_date):
    access_token = get_access_token_powerAutomate()
    st.write("access_token",access_token)
    if not access_token:
        st.error("Access token retrieval failed.")
        return
   
    mail_subject = f" Notes of {selected_subject} on {transcript_date}"
    # Data to be sent
    transcript_data = {
        'organizer_upn': organizer_upn,
        'attendee_upns': attendee_upns,
        'meeting_notes': meeting_notes,
        'mail_subject': mail_subject
    }
 
    # Set the headers with the access token
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Content-Type': 'application/json'
    }
 
    # Power Automate flow endpoint URL
    power_automate_url = os.getenv("POWER_AUTOMATE_URL")
    if not power_automate_url:
        st.error("Power Automate URL is not set.")
        return
 
    # Making the POST request to Power Automate flow
    response = requests.post(power_automate_url, headers=headers, json=transcript_data)
 
    if response.status_code == 200:
        st.success("Meeting Notes sent successfully for organizer's approval!")
        st.success("Check your mail once they approve.")
    else:
        st.error(f"Failed to send Meeting Notes. Status code: {response.status_code}")
        st.error(f"Response content: {response.content}")

更改范围后,您将能够成功触发电源自动流程。

© www.soinside.com 2019 - 2024. All rights reserved.