对于使用 Reactor Kafka 的 Spring WebFlux,我为我的消费者提供了以下代码:
public EventConsumer(KafkaReceiver<String, String> inputEventReceiver,
MessageHelper messageHelper,
List<EventProcessor> eventProcessors) {
this.inputEventReceiver = inputEventReceiver;
this.messageHelper = messageHelper;
this.eventProcessorChain = new EventProcessorChain(eventProcessors);
}
public Disposable consumeMessage() {
return processRecord()
.onBackpressureBuffer(BUFFER_SIZE)
.limitRate(MAX_BATCH_SIZE)
.retryWhen(Retry.indefinitely())
.subscribe(record -> {}, error -> log.error("error while consuming event with message {}", error.getMessage()));
}
public Flux<EventWrapper> processRecord() {
Flux<ReceiverRecord<String, String>> receiverRecord = Flux.defer(inputEventReceiver::receive);
return receiverRecord.flatMap(this::processMessage);
}
private Flux<EventWrapper> processMessage(final ReceiverRecord<String, String> receiverRecord) {
EventWrapper eventWrapper = messageHelper.fetchDataFromRecord(receiverRecord);
return processEventThroughChain(eventWrapper, receiverRecord);
}
private Flux<EventWrapper> processEventThroughChain(EventWrapper eventWrapper, ReceiverRecord<String, String> receiverRecord) {
return eventProcessorChain.processEvent(eventWrapper)
.filter(EventWrapper::isValid)
.doOnNext(result -> receiverRecord.receiverOffset().acknowledge()).flux();
}
MAX_BATCH_SIZE
的值我保留为500。问题是当这个消费者开始消费时,如果事件失败(在我的例子中isValid为假),我不会提交它的偏移量,因为我希望事件重试。
但是发生的情况是,在这批事件中,如果中间的事件失败并且没有提交,它仍然不会重试,因为同一批中此之后的事件被提交。
我能想到的唯一解决方案是有一个重试逻辑,但是如果我不想错过该事件,我必须继续重试。这将占用我的一个消费者。如果我在重试后继续消费事件,这个事件将会丢失,所以这就是为什么我暂时无限期地重试,并且目前需要手动干预该问题。
那么,有什么方法可以在 WebFlux 中处理这个问题吗?对我来说似乎正确的解决方案是使用 DLQ(死信队列),但除此之外是否可以在 Reactor 端处理它?
不会重试,因为您需要将方法包装在
Flux.defer(() -> processRecord())...
或 fromSuplier
中。您可以查看这篇文章了解更多详细信息:https://www.woolha.com/tutorials/project-reactor-using-retry-and-retrywhen-examples