因此,我从云函数切换到 Cloud Run,并试图弄清楚如何在本地运行 Event Arc。我知道您可以使用 Firebase 模拟器为 Eventarc 设置模拟器,但我不确定当我写入本地 Firestore 数据库时它是否会触发我的 Cloud Run 功能之一。有人可以告诉我如何做到这一点吗?
我确实在这里看到了一个模糊的答案:
但对我来说,这没有意义,因为如果我使用本地数据库和本地函数,远程实例将如何与我的本地开发环境一起工作。如果可能的话,请告诉我以及如何实现这一目标。谢谢。
这不是一件容易的事,团队正在努力让本地测试变得更容易。现在,我可以分享我的技巧。
首先,您必须知道 eventarc 大致是一个包装器,它在幕后创建多个资源,特别是 PubSub 主题和对 Cloud Run 服务的推送订阅。因此,eventarc 事件只不过是一个以事件内容作为正文的 POST 请求。
对于我的黑客攻击,我在 GCP 上有一个 Cloud Run 服务,用于记录任何传入请求的标头和正文。我设置了一个以该服务为目标的 eventarc,然后触发了一个事件。
我转到日志,复制收到的事件的标头和正文,然后用它创建一个curl POST 请求。
然后,当我想测试我的本地服务时,我重用我的curl POST请求,并将其提交到我的本地主机服务器。
为了保证 guillaume blaquiere 答案的完整性,这是我使用的代码:
使用以下代码在生产中创建一个带有 Storage -> google.cloud.storage.object.v1.finalized 类型的 EventArc 触发器的 python 云运行函数:
import functions_framework
from cloudevents.http import CloudEvent
@functions_framework.cloud_event
def hello_gcs(cloud_event: CloudEvent) -> None:
for item in cloud_event:
print(f"Item: {item} -> {cloud_event[item]}")
data = cloud_event.data
for my_data_item in data:
print(f"Data Item: {my_data_item} -> {data[my_data_item]}")
然后在存储桶中上传一个文件,这应该会触发云运行功能。
从 Google Cloud Log Explorer,您可以提取 Cloud Function 接收到的值。
使用这些值,您可以构造一个curl请求,或者在我的例子中,我宁愿使用像这样的python脚本:
import requests
import json
url = "http://localhost:5001" # Local cloud run function
headers = {
"Content-Type": "application/cloudevents+json"
}
data = {
"specversion": "1.0",
"type": "google.cloud.storage.object.v1.finalized",
"source": "//storage.googleapis.com/projects/_/buckets/my-bucket-test",
"subject": "objects/my_file.jpg",
"id": "12102065114832012",
"time": "2024-08-30T20:10:00Z",
"data": {
"kind": "storage#object",
"id": "my-bucket-test/1725048606538342",
"selfLink": "https://www.googleapis.com/storage/v1/b/my-bucket-test/o/",
"name": "my_file.jpg",
"bucket": "my-bucket-test",
"generation": "1725048606538342",
"metageneration": "1",
"contentType": "image/jpeg",
"timeCreated": "2024-08-30T20:10:06.542Z",
"updated": "2024-08-30T20:10:06.542Z",
"storageClass": "STANDARD",
"timeStorageClassUpdated": "2024-08-30T20:10:06.542Z",
"size": "0",
"md5Hash": "1B2ABC8AsgTpgAmY7PhCfg==",
"mediaLink": "https://storage.googleapis.com/download/storage/v1/b/my-bucket-test/o/generation=1725048606538342&alt=media",
"crc32c": "AAAAAA==",
"etag": "COb0puvCnYgDEAE=",
"temporaryHold": False,
"eventBasedHold": False
}
}
response = requests.post(url, headers=headers, data=json.dumps(data))
print(response.status_code)
print(response.text)
然后有一个本地云运行函数@ localhost:5001,如下所示:
import functions_framework
from cloudevents.http import CloudEvent
@functions_framework.cloud_event
def hello_gcs(cloud_event: CloudEvent) -> None:
for item in cloud_event:
print(f"Item: {item} -> {cloud_event[item]}")
data = cloud_event.data
for my_data_item in data:
print(f"Data Item: {my_data_item} -> {data[my_data_item]}")
我尝试使用 Firebase Eventarc 模拟器,但如果 Firebase Cloud Functions 模拟器未运行,则会失败。就我而言,我没有使用 Firebase Cloud Functions,而是使用 Google Cloud Run 函数。