在我们的项目中,我们使用 GCP pubsub 将异步作业从应用程序路由到工作线程。 要订阅,我们使用
subscribe
中的 pubsub_v1.SubscriberClient
方法。它返回 subscriber.futures.StreamingPullFuture
对象,example:
future = subscriber_client.subscribe(
subscription, callback)
为了监控订阅是否有效,我们使用 StreamingPullFuture API 示例中的 running:
if future.running():
do something
else:
do something else
问题就在这里。当工作人员获得高流量时 - 在 pubsub 中,消息数量超出工作人员在一段时间内能够处理的数量,此检查
future.running()
评估为 false
。另一方面,当工作人员能够跟上流量/流量相当低时,检查的评估结果始终为 true
。
知道为什么会这样吗?以及如何处理
future.running() == false
?我们应该重新订阅吗?
听起来订阅者对工作感到不知所措,无法跟上保持流对服务开放所需的基本操作,例如流上的心跳。如果处理消息的强度足够大,以至于订阅者在高负载下变得资源受限,那么您可能需要设置更严格的流量控制限制,以便在发布负载增加时减少订阅者客户端的未完成工作量。
有了流量控制限制,您可以水平扩展订阅者数量,以便在相同的时间内处理消息(将负载分散到更多实例上),也可以允许单个订阅者处理消息订阅者,从而将负载分散到更长的时间内。