我正在使用 KafkaJS ( https://kafka.js.org ) 连接到 Kafka 集群。我的消费者负责处理有时可能失败并在等待一段时间(例如 1 小时)后成功的任务。 在此持续时间之前,任何处理任何任务的尝试都将导致另一次失败。
我尝试使用https://kafka.js.org/docs/consuming#a-name-pause-resume-a-pause-resume中所述的暂停和恢复方法。然而,消费者立即重新启动并继续再次消费失败的任务。
如何暂停消费消息而不提交失败的消息并立即重新启动消费者?
另一种方法是将最后获取的偏移量保存在某个缓存或数据库中,然后调用消费者的查找方法以从最后一个偏移量开始获取。