我正在寻找一种解决方案来运行需要 40 分钟才能执行的函数。我在文档中看到 HTTP 函数最多可以运行 60 分钟。因此,我创建了一个函数“enqueue_task”,它创建了一个调用我的“长时间运行的函数”的 CloudTask。在模拟器中,我可以运行“enqueue_task”并且它可以工作,但是当我部署此函数时,我收到异常“400 Request contains an invalid argument”
from firebase_functions import firestore_fn, https_fn
# The Firebase Admin SDK to access Cloud Firestore.
from firebase_admin import initialize_app, firestore, functions
import google.cloud.firestore
# Dependencies for task queue functions.
from google.cloud import tasks_v2
# import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
from google.auth.transport.requests import AuthorizedSession
import os, json
from dotenv import load_dotenv
import traceback
@https_fn.on_request()
def enqueue_task(req: https_fn.Request) -> https_fn.Response:
"""Adds tasks to a Cloud Tasks queue."""
credentials, _ = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
print("service account:", credentials.service_account_email)
# this is the URL of my long running task
url = "https://addmessage-XXXXX-uc.a.run.app/addmessage?text=nodanoda44444"
# Create a Cloud Tasks client.
client = tasks_v2.CloudTasksClient()
parent = client.queue_path(project=app.project_id, location="us-central1", queue="q-teste")
task = {
"http_request": { # Specify the type of request.
"http_method": tasks_v2.HttpMethod.POST,
"url": url,
"oidc_token": {
"service_account_email": credentials.service_account_email
},
}
}
# Use the client to build and send the task.
try:
response = client.create_task(parent=parent, task=task)
print(f"Created task {response.name}")
return https_fn.Response(f"Created task {response.name}")
except Exception as e:
print(f"Error creating task {e}")
print(traceback.format_exc())
return https_fn.Response(f"Error creating task ", status=400)
当我在模拟器中运行时,我可以调用长时间运行的函数,但部署的版本显示“400 请求包含无效参数”
出现错误的行是:
response = client.create_task(parent=parent, task=task)
问题似乎是权限问题,我能够设置一个新的服务帐户并使用以下代码
from google.oauth2 import service_account
encoded_cred = os.environ.get('ALT_ACCOUNT')
decoded_cred = base64.b64decode(encoded_cred)
alt_account = json.loads(decoded_cred.decode('utf-8'))
alt_credentials = service_account.Credentials.from_service_account_info(alt_account)
现在在我的函数中我调用
client = tasks_v2.CloudTasksClient(credentials=alt_credentials)