RabbitMQ 上累积的 Paho MQTT 消息

问题描述 投票:0回答:1

我在启用 MQTT 的 RabbitMQ 代理上有一个 Paho 客户端订阅者。订阅者代码如下所示:

 def _retry_with_backoff(
        self,
        func: Callable[..., None],
        kwargs: dict[str, any],
        current_retry: int = 0,
        max_retries: int = 5,
        backoff: int = 2,
    ):
        try:
            func(**kwargs)
        except Exception as e:
            if current_retry < max_retries:
                time_to_sleep: int = backoff * current_retry
                time.sleep(time_to_sleep)
                opt_message = kwargs.get("message", None)
                message_id = (
                    opt_message.mid
                    if opt_message is not None and isinstance(opt_message, MQTTMessage)
                    else -1
                )
                logger.error(
                    "Message processing with ID %s from topic %s failed with exception %s. "
                    "Retrying after %d seconds. Current retry is %d...",
                    message_id,
                    self.get_topic(),
                    e,
                    time_to_sleep,
                    current_retry,
                )
                self._retry_with_backoff(func, kwargs, current_retry + 1, max_retries, backoff)
            else:
                logger.exception("Max retries reached. Message will be ignored.")

def on_message(self, client, userdata, message: MQTTMessage):
    self._retry_with_backoff(
            func=_on_message,
            kwargs=dict(client=client, userdata=userdata, message=message),
   )
def _on_message(client, userdata, message: MQTTMessage):
      try:
          self.process(message.payload)
      except Exception as e:
          raise e
      finally:
           logger.debug()
   

我看不到这段代码产生任何异常。即使在产生错误/异常的情况下,

except
子句也应该捕获它而不是传播它。

尽管如此,当产生

google.protobuf.message.DecodeError
时,似乎消息已累积在消费者对应的队列中。

我认为我正在捕获所有异常/错误,这些异常/错误应该翻译为

ACK
给经纪人。难道我错了?

rabbitmq mqtt paho
1个回答
0
投票

您使用的 Paho 客户端版本是什么?

on_message
回调添加额外的 try/catch 块。

如果没有明确捕获和处理,Paho 客户端将默默地捕获回调中抛出的异常。

© www.soinside.com 2019 - 2024. All rights reserved.