GCP Pub/Sub 订单密钥属性不起作用

问题描述 投票:0回答:1

我正在为 GCP Pub/Sub 和 fastAPI 使用 python 客户端。我有一个端点的推送订阅,并且正在发送带有 order key 属性的消息。然而,大约有四分之一的时间,我在接收端点上收到的消息是乱序的。

我有一个启用了 order key 属性的 Pub/Sub 订阅: 订阅详情截图

我正在使用 GCP Pub/Sub 的 python 客户端发布 2 条消息:

from google.cloud import pubsub
import json
import typing

pubsub_client = pubsub.PublisherClient()


def fire_event_to_pubsub(
    event_topic_name: str,
    event_payload: dict[str, typing.Any],
    order_key: str,
) -> None:
    # Publish the event
    topic_path = pubsub_client.topic_path(
        project="project_name", topic=event_topic_name
    )
    message_json = json.dumps(event_payload)
    # Encode the JSON string to bytes
    message_bytes = message_json.encode("utf-8")
    future = pubsub_client.publish(topic_path, data=message_bytes, order_key=order_key)
    print(future.result())
    print(
        f"Published message to topic {topic_path}: {message_json} with order key {order_key}"
    )


form_payload1 = {
    "application_id": "50030469",
    "form_path": "gs://fa-form-received-14534/application1/50030469.PDF",
    "form_hash": "1111",
}

fire_event_to_pubsub(
    "form_received_ordered",
    form_payload1,
    "50030469" + "-" + "form_name",
)

form_payload2 = {
    "application_id": "50030469",
    "form_path": "gs://fa-form-received-14534/application2/50030469.PDF",
    "form_hash": "2222",
}
fire_event_to_pubsub(
    "form_received_ordered",
    form_payload2,
    "50030469" + "-" + "form_name",
)

有一个记录消息的 fastAPI 端点。然而,它们在日志中似乎没有顺序。在这里您可以看到正在使用的订单键属性 - 并且第二条消息的发布时间稍早 - 所以我希望它首先被记录: 日志截图

有人遇到过这个问题吗?我有什么遗漏的吗?

python fastapi google-cloud-pubsub
1个回答
0
投票

使用排序键进行发布的属性是

ordering_key
,而不是
order_key
。通过在
order_key
中使用
publish
代替,发布者将该字段视为属性,而不是将其放入
ordering_key
属性中,这意味着无法保证交货订单。您还应该在
PublisherOptions
中启用发布者客户端本身的排序。这不太可能影响您编写的代码,因为您在发布下一个发布之前会等待一个发布的未来结果,但如果您曾经异步等待结果,则不设置
enable_message_ordering
可能会导致无序发布。

请参阅 使用订购键在 Python 中发布的示例代码,最重要的是:

publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)
...
future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
© www.soinside.com 2019 - 2024. All rights reserved.