我正在实施死信逻辑,但在使用
webClient
调用外部服务时发生异常后,无法确认记录
orderConsumer
.receive()
.concatMap(
orderRecord ->
orderService
.process(orderRecord) // use webClient to make http calls
.onErrorMap(
throwable -> {
if (throwable instanceof NonRetryableExceptionMarker) {
return new NonRetryableException(throwable, orderRecord );
}
return throwable;
}))
.onErrorResume(
throwable -> {
if (throwable instanceof NonRetryableException nonRetryableException) {
deadLetterPublishingRecoverer.accept(
nonRetryableException.getReceiverRecord(), nonRetryableException);
nonRetryableException.getReceiverRecord().receiverOffset().acknowledge();
}
return Flux.empty();
})
.repeat();
消息消费在线程
[reactive-kafka-orderGroup-1]
上开始,调用orderService.process(orderRecord)
后切换到[reactor-http-nio-2]
,
当出现错误(404错误)时,它会在同一个线程上继续
[reactor-http-nio-2]
并调用nonRetryableException.getReceiverRecord().receiverOffset().acknowledge();
但消息未提交,
确认调用必须在
[reactive-kafka-orderGroup-1]
线程上完成,因为我在调用外部服务之前尝试模拟异常并且它按预期工作
P.S:在日志中我可以看到
[reactive-kafka-orderGroup-1]
线程停止(因为repeat()
)在另一个线程`[reactor-http-nio-2]上调用
.acknowledge()
之前