我正在测试一个 Azure 函数,该函数具有 Blob 触发器,然后在消息队列中写入一条消息。 我禁用了“允许存储帐户密钥访问”,因为我们希望使用托管标识来访问所有资源。 问题是,当触发下面的代码时会出现错误:
```2024-12-12T01:00:08Z [Verbose] Host instance '5xxxxxxxx' failed to acquire host lock lease: Azure.Storage.Blobs: Service request failed.
Status: 403 (Key based authentication is not permitted on this storage account.)
ErrorCode: KeyBasedAuthenticationNotPermitted"```
存储帐户上的功能MI被授予存储Blob数据贡献者和存储队列数据贡献者的权限。 代码如下,来自 MSFT 提供的示例:
```import logging
import json
import base64
import time
import azure.functions as func
from azure.identity import DefaultAzureCredential
from azure.storage.queue import QueueServiceClient
app = func.FunctionApp()
@app.blob_trigger(arg_name="myblob", path="ccr1/{name}",
connection="AzureWebJobsStorage")
def blob_trigger(myblob: func.InputStream):
logging.info(f"Python blob trigger function processed blob"
f"Name: {myblob.name}"
f"Blob Size: {myblob.length} bytes")
try:
message_data = {
"file_path": myblob.name
}
blob_name = myblob.name.split('/')[-1]
if blob_name.endswith('.pdf'):
logging.info(f"Extracted PDF name: {blob_name}")
ccr_e2e_report(blob_name) # Send the PDF name to another function
else:
logging.warning("The blob is not a PDF file.")
except Exception as e:
logging.info(e)
# Example function to send data to
def ccr_e2e_report(pdf_name: str):
logging.info(f"Generating CAR report for : {pdf_name}")
time.sleep(20)
logging.info(f"Report generation complete: {pdf_name}")
@app.route(route="http_trigger", auth_level=func.AuthLevel.ANONYMOUS)
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
cnt = int(req.params.get('count'))
if cnt:
for number in range(1, cnt + 1):
message_data = {
"file_path": "None",
"file_uri": "None",
"file_metadata": f"{cnt}"
}
# Convert the Python dictionary to a JSON string
message_json = json.dumps(message_data)
send_message_to_queue("ccrqueue", message_json)
return func.HttpResponse(f"Submitted: {cnt} Messages. This HTTP triggered function executed successfully.")
else:
return func.HttpResponse(
"This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized resonse.",
status_code=200
)
```
我所有的研究告诉我,他们支持 MI 从功能应用程序访问存储帐户,但也许我在这里遗漏了一些明显的东西,任何信息都会很好。 谢谢
我已修改您的代码,通过将
DefaultAzureCredential
角色分配给 Azure 存储中的服务主体,将消息发送到本地的 Storage Queue Data Contributor
队列。
function_app.py:
import logging
import os
import json
import time
import azure.functions as func
from azure.identity import DefaultAzureCredential
from azure.storage.queue import QueueServiceClient
app = func.FunctionApp()
def send_message_to_queue(queue_name: str, message: str):
credential = DefaultAzureCredential()
queue_service_uri = os.getenv("BlobConnec__queueServiceUri")
if not queue_service_uri:
raise ValueError("Queue service URI not found.")
queue_service = QueueServiceClient(account_url=queue_service_uri, credential=credential)
queue_client = queue_service.get_queue_client(queue_name)
queue_client.send_message(message)
logging.info(f"Message sent to queue '{queue_name}': {message}")
@app.blob_trigger(arg_name="myblob", path="ccr1/{name}", connection="BlobConnec")
def blob_trigger(myblob: func.InputStream):
logging.info(f"Python blob trigger function processed blob\n"
f"Name: {myblob.name}\n"
f"Blob Size: {myblob.length} bytes")
try:
blob_name = myblob.name.split('/')[-1]
if blob_name.endswith('.pdf'):
logging.info(f"Extracted PDF name: {blob_name}")
ccr_e2e_report(blob_name)
else:
logging.warning("The blob is not a PDF file.")
except Exception as e:
logging.error(f"Error processing blob: {e}")
def ccr_e2e_report(pdf_name: str):
logging.info(f"Generating CAR report for: {pdf_name}")
time.sleep(20)
logging.info(f"Report generation complete: {pdf_name}")
@app.route(route="http_trigger", auth_level=func.AuthLevel.ANONYMOUS)
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
cnt = int(req.params.get('count', 0))
if cnt > 0:
for number in range(1, cnt + 1):
message_data = {
"file_path": "None",
"file_uri": "None",
"file_metadata": f"{number}"
}
message_json = json.dumps(message_data)
send_message_to_queue("ccrqueue", message_json)
return func.HttpResponse(f"Submitted: {cnt} Messages. This HTTP triggered function executed successfully.")
else:
return func.HttpResponse(
"This HTTP triggered function executed successfully. Pass a count in the query string or in the request body.",
status_code=200
)
local.settings.json:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "python",
"BlobConnec__blobServiceUri": "https://<storageName>.blob.core.windows.net",
"BlobConnec__queueServiceUri": "https://<storageName>.queue.core.windows.net"
}
}
我们应该在 Azure AD 中创建一个服务主体,并将
Client ID
、Client Secret
和 Tenant ID
添加到系统环境变量中,以使该功能能够使用 DefaultAzureCredentials
,如下所示。
将服务主体详细信息添加到
System Environment Variables
。
AZURE_CLIENT_ID = <clientID>
AZURE_CLIENT_SECRET = <clientSecret>
AZURE_TENANT_ID = <TenantID>
要使用
Managed Identity
,请在 Function App 中的系统分配下启用身份。
我们需要在 Azure 存储 > 访问控制 (IAM) 设置中将
Storage Blob Data Owner
和 Storage Queue Data Contributor
角色分配给服务主体,并将 Storage Blob Data Contributor
角色分配给 Function App。
浏览器输出:
我运行了该函数并在浏览器中发送了针对 HTTP 触发函数的 GET 请求。
我向 Azure 队列存储发送了一条消息,如下所示:
我成功收到了 Azure 存储队列中的消息。
我将 PDF 文件上传到 Blob 存储,如下所示。
终端输出:
HTTP触发器和Blob触发器函数运行成功,如下所示。