我有一个 Cloud Run 函数,它会在
google.cloud.firestore.document.v1.written
Firestore 事件上触发,如下所述:
from cloudevents.http import CloudEvent
import functions_framework
from google.events.cloud import firestore
from google.protobuf.json_format import MessageToDict
@functions_framework.cloud_event
def hello_firestore(cloud_event: CloudEvent) -> None:
"""Triggers by a change to a Firestore document.
Args:
cloud_event: cloud event with information on the firestore event trigger
"""
firestore_payload = firestore.DocumentEventData()
firestore_payload._pb.ParseFromString(cloud_event.data)
# Convert protobuf to dictionary
old_value_dict = MessageToDict(firestore_payload.old_value._pb) if firestore_payload.old_value else {}
new_value_dict = MessageToDict(firestore_payload.value._pb) if firestore_payload.value else {}
print(new_value_dict)
print(old_value_dict)
但是,输出似乎是这种格式:
{
"name": "projects/my-gcp-project/databases/mydb/documents/mycollection/e2767cc0-2e04-452d-9b7b-fc170e2128ec",
"fields": {
"created_at": {"stringValue": "2024-10-31T11:09:34.760557Z"},
"session_id": {"stringValue": "e2767cc0-2e04-452d-9b7b-fc170e2128ec"},
"user_profile": {
"mapValue": {
"fields": {
"age": {"integerValue": "36"},
"name": {"stringValue": "Andrew"},
"middle_name": {"nullValue": None}
}
}
},
....
"createTime": "2024-10-31T11:09:34.802575Z",
"updateTime": "2024-10-31T12:13:36.154031Z",
}
如何以编程方式解码消息,以便将所有内容转换为 Python 本机类型,格式如下:
{
"created_at": "2024-10-31T11:09:34.760557Z",
"session_id": "e2767cc0-2e04-452d-9b7b-fc170e2128ec",
"user_profile": {
"age": 36,
"name": "Andrew",
"middle_name": None,
}
}
令人惊讶的是,Google 的库中似乎没有(!?)进行此转换的功能。
正如 @frank-van-puffelen 所建议的,该功能虽然乏味但很简单:
requirements.txt
:
google-cloud-firestore==2.19.0
google-events==0.13.0
main.py
:
from proto.marshal.collections.maps import MapComposite
from google.events.cloud.firestore import (
Document,
DocumentEventData,
ArrayValue,
MapValue,
Value,
)
def DocumentToDict(d: Document) -> dict:
return {
"name": d.name,
"fields": MapCompositeToDict(d.fields),
"create_time": d.create_time,
"update_time": d.update_time,
}
def ArrayValueToList(a: ArrayValue) -> list:
result: list = []
for value in a.values:
result.append(ValueToDict(value))
return result
def MapCompositeToDict(m: MapComposite) -> dict:
result: dict = {}
for key, value in m.items():
key: str
value: Value
result[key] = ValueToDict(value)
return result
def ValueToDict(v: Value) -> dict:
λs = {
"null_value": lambda v: None,
"boolean_value": lambda v: v.boolean_value,
"integer_value": lambda v: v.integer_value,
"double_value": lambda v: v.double_value,
"timestamp_value": lambda v: v.timestamp_value,
"string_value": lambda v: v.string_value,
"bytes_value": lambda v: v.bytes_value,
"reference_value": lambda v: v.reference_value,
"geo_point_value": lambda v: v.geo_point_value,
"array_value": lambda v: ArrayValueToList(v.array_value),
"map_value": lambda v: MapCompositeToDict(v.map_value.fields),
}
type = v._pb.WhichOneof("value_type")
λ = λs[type]
if λ is None:
raise ValueError(f"Unknown type: {type}")
return λ(v)
data = """
{
"name": "projects/my-gcp-project/databases/mydb/documents/mycollection/e2767cc0-2e04-452d-9b7b-fc170e2128ec",
"fields": {
"created_at": {"stringValue": "2024-10-31T11:09:34.760557Z"},
"session_id": {"stringValue": "e2767cc0-2e04-452d-9b7b-fc170e2128ec"},
"user_profile": {
"mapValue": {
"fields": {
"age": {"integerValue": "36"},
"name": {"stringValue": "Andrew"},
"middle_name": {"nullValue": 0}
}
}
}
},
"createTime": "2024-10-31T11:09:34.802575Z",
"updateTime": "2024-10-31T12:13:36.154031Z"
}
"""
value: Document = Document.from_json(data)
d: dict = DocumentToDict(value)
print(d["fields"])
产量:
{
'created_at': '2024-10-31T11:09:34.760557Z',
'session_id': 'e2767cc0-2e04-452d-9b7b-fc170e2128ec',
'user_profile': {
'age': 36,
'middle_name': None,
'name': 'Andrew'
}
}