使用 GCP pubsub 订阅,我们目前以这种方式拉取消息:
with subscriber_client:
try:
self.subscriber_future = subscriber_client.subscribe(
self.subscription_path, callback=add_to_local_queue, await_callbacks_on_shutdown=True
)
self._periodically_process_messages(extend=extend)
add_to_local_queue
回调将消息存储在队列中,self._periodically_process_messages
运行一个循环来处理排队的消息,如果正常则确认它们,然后休眠直到下一个 200ms 滴答。
目标是批量处理消息,以提高效率,但等待时间不超过 200 毫秒。
这在大多数情况下都有效。但我认为 python pubsub 库有自己的内部消息队列,所以也许这种双重队列管理是不必要的。 也许让图书馆处理队列是更好的方法。 这意味着以这种方式获取消息:
with subscriber_client:
while not completed:
try:
messages = subscriber_client.pull(timeout=0.2)
self._process_messages(messages)
再次强调,
self._process_messages()
会根据结果确认或拒绝消息。
哪种方法最好? 它们是否都给出了预期的行为,即按 200 毫秒批量处理传入消息?
如果要保证资源的有效利用,最好采用pull方式,每200ms处理一次消息。这种方法可以直接控制时间,并且可以有效地收集和处理消息。
为了处理高消息量并希望避免手动队列复杂性,请使用回调方法,但请注意或检查文档以了解内部队列限制。