我在启用 MQTT 的 RabbitMQ 代理上有一个 Paho 客户端订阅者。订阅者代码如下所示:
def _retry_with_backoff(
self,
func: Callable[..., None],
kwargs: dict[str, any],
current_retry: int = 0,
max_retries: int = 5,
backoff: int = 2,
):
try:
func(**kwargs)
except Exception as e:
if current_retry < max_retries:
time_to_sleep: int = backoff * current_retry
time.sleep(time_to_sleep)
opt_message = kwargs.get("message", None)
message_id = (
opt_message.mid
if opt_message is not None and isinstance(opt_message, MQTTMessage)
else -1
)
logger.error(
"Message processing with ID %s from topic %s failed with exception %s. "
"Retrying after %d seconds. Current retry is %d...",
message_id,
self.get_topic(),
e,
time_to_sleep,
current_retry,
)
self._retry_with_backoff(func, kwargs, current_retry + 1, max_retries, backoff)
else:
logger.exception("Max retries reached. Message will be ignored.")
def on_message(self, client, userdata, message: MQTTMessage):
self._retry_with_backoff(
func=_on_message,
kwargs=dict(client=client, userdata=userdata, message=message),
)
def _on_message(client, userdata, message: MQTTMessage):
try:
self.process(message.payload)
except Exception as e:
raise e
finally:
logger.debug()
我看不到这段代码产生任何异常。即使在产生错误/异常的情况下,
except
子句也应该捕获它而不是传播它。
尽管如此,当产生
google.protobuf.message.DecodeError
时,似乎消息已累积在消费者对应的队列中。
我认为我正在捕获所有异常/错误,这些异常/错误应该翻译为
ACK
给经纪人。难道我错了?
您使用的 Paho 客户端版本是什么?
向
on_message
回调添加额外的 try/catch 块。
如果没有明确捕获和处理,Paho 客户端将默默地捕获回调中抛出的异常。