Google Cloud PubSub消息未通过回调处理

问题描述 投票:1回答:1

我正在尝试使用Google PubSub在两个服务之间传递和接收消息。但是,发送的某些消息似乎是随机丢弃的,并且不会被订阅者的回调方法处理。

发送消息时,回调方法处理大约一半的消息。对于另一半,回调方法似乎根本没有被调用(没有记录信息)。但是,消息仍然从主题中消失,并且不会重新发送。

用于启动订户的代码:

logger = logging.getLogger(LOGGER_NAME)
logger.info('Starting the pubsub subscriber')
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(GOOGLE_CLOUD_PROJECT, SUBSCRIPTION_NAME)
subscriber.subscribe(subscription_path, callback=callback)
while True:
    try:
        sleep(60)
    except Exception as e:
        // Log exception

回调方法:

def callback(message):
    logger = logging.getLogger(LOGGER_NAME)
    logger.info(f'Recieved callback with message: {message}', extra = {'callback_message': message}  )
    // Process message

该错误似乎在订户方。消息从发布者发送,如果订阅者未连接到主题,则消息不会消失。

我曾尝试使用Flow Control来控制订阅者检索的邮件数量,但似乎没有任何效果。

可以在不调用回调方法的情况下处理消息吗?还有其他一些原因导致消息可能会从主题中消失吗?

编辑:原来另一个服务是从同一订阅读取,处理丢失的消息。

python google-cloud-platform google-cloud-pubsub
1个回答
0
投票

我知道你找到了问题的答案,但我认为列出调试此类问题的一些有用步骤是值得的:

  1. 检查以确保实际发布的消息。当发布成功时,响应应该包括消息的ID,例如,作为由QIFxswpoi方法中的APIFuture产生的字符串。
  2. 检查是否积压了积压的消息。您可以通过Java Publish查看subscription/oldest_unacked_message_agesubscription/num_undelivered_messages
  3. 检查您的订户是否设置了Stackdriver,以防止您及时收到所有消息。如果您有流量控制集并且它阻止传递所有消息,您可能会看到堆栈驱动程序中未传递消息的数量正在增加。
  4. 确保您没有任何其他客户订阅同一订阅的消息。例如,也许你正在使用flow control并查看消息。在这种情况下,您可能不会在Stackdriver中看到未传递的消息数量增加。

如果在检查完所有内容之后您不确定您的消息发生了什么,最好与支持部门联系,提供项目名称和订阅,以及您认为未交付的任何消息的ID。

© www.soinside.com 2019 - 2024. All rights reserved.