从 pubsub 订阅者接收消息,使用拉取与使用回调

问题描述 投票:0回答:1

使用 GCP pubsub 订阅,我们目前以这种方式拉取消息:

    with subscriber_client:
        try:
            self.subscriber_future = subscriber_client.subscribe(
                self.subscription_path, callback=add_to_local_queue, await_callbacks_on_shutdown=True
            )
            self._periodically_process_messages(extend=extend)

add_to_local_queue
回调将消息存储在队列中,
self._periodically_process_messages
运行一个循环来处理排队的消息,如果正常则确认它们,然后休眠直到下一个 200ms 滴答。

目标是批量处理消息,以提高效率,但等待时间不超过 200 毫秒。

这在大多数情况下都有效。但我认为 python pubsub 库有自己的内部消息队列,所以也许这种双重队列管理是不必要的。 也许让图书馆处理队列是更好的方法。 这意味着以这种方式获取消息:

    with subscriber_client:
        while not completed:
            try:
                messages = subscriber_client.pull(timeout=0.2)
                self._process_messages(messages)

再次强调,

self._process_messages()
会根据结果确认或拒绝消息。

哪种方法最好? 它们是否都给出了预期的行为,即按 200 毫秒批量处理传入消息?

python google-cloud-platform google-cloud-pubsub
1个回答
0
投票

如果要保证资源的有效利用,最好采用pull方式,每200ms处理一次消息。这种方法可以直接控制时间,并且可以有效地收集和处理消息。

为了处理高消息量并希望避免手动队列复杂性,请使用回调方法,但请注意或检查文档以了解内部队列限制。

© www.soinside.com 2019 - 2024. All rights reserved.