Spring Application使用Reactor Kafka消费消息
问题一:是否有标准约定在应用程序关闭期间暂停消息消费并完成处理动态消息?
目前的实现是使用
reactiveKafkaConsumerTemplate
来接收消息。然后使用@Predestroy
,我们暂停消费者使用reactiveKafkaConsumerTemplate.pause
.
当前实施(简化)
reactiveKafkaConsumerTemplate
.receiveAutoAck()
.publishOn(Schedulers.boundedElastic())
.flatMap(x -> Mono.just(x)
.delayElement(Duration.ofMillis(300)),5)
.flatMap(message -> Mono.just(message)
.flatMap(processMessageImp::processMessage)
.onErrorResume(t -> Mono.empty())
);
public void pauseKafkaMessageConsumer() {
reactiveKafkaConsumerTemplate
.assignment()
.flatMap(topicParts -> reactiveKafkaConsumerTemplate.pause(topicParts))
.subscribe();
}
@PreDestroy
public void onExit() {
pauseKafkaMessageConsumer();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
log.error("onExit Error while PreDestroy ");
}
}
问题二:使用@Predestroy时如何防止WebClientRequestException
然而,在
Thread.sleep
中的 @Predestroy
之后关机期间,任何飞行消息由于 WebClientRequestException
而失败。在 Thread.sleep
过程中成功处理的消息。
WebClientRequestException: executor not accepting a task; nested exception is java.lang.IllegalStateException: executor not accepting a task
Webclient 实现使用 Springs injected builder
private final WebClient.Builder builder;
public WebClient createWebclient() {
...
return builder.build()
}
WebClientRequestException
似乎只有在使用 Spring 注入的 Webclient.Builder 时才会发生。
我采用了类似的方法,在测试期间似乎在本地有效。作为处理的一部分,它从同一主题中提取消息并将其发送到同一主题,以测试它在关闭之前是否成功地使用了所有消息。
公平地说,我不确定这是否是一个好方法。我对 Kafka 还是有点陌生,可以使用一些反馈!
@NonNull
private ReactiveKafkaConsumerTemplate<String,String> kafkaConsumerTemplate;
@NonNull
private ReactiveKafkaProducerTemplate<String,String> kafkaProducerTemplate;
@Value("${spring.kafka.consumer.partitionCount}")
private Integer topicPartitionsCount;
private boolean shuttingDown;
private final Composite disposables = Disposables.composite();
@EventListener(ApplicationReadyEvent.class)
public void postConstruct() {
for (int i = 0; i < topicPartitionsCount; i++) {
disposables.add(readWrite().subscribe());
}
}
@PreDestroy
public void preDestroy() {
shuttingDown = true;
kafkaConsumerTemplate.assignment()
.delayElements(Duration.of(20, ChronoUnit.SECONDS))
.flatMap(o -> kafkaConsumerTemplate.pause(o)).subscribe();
}
public Flux<MyMessage> readWrite() {
return kafkaConsumerTemplate
.receiveAutoAck()
.takeUntil(item -> shuttingDown)
.flatMap(item -> ingest(item.value()))
.flatMap(item -> kafkaProducerTemplate.send("example_topic", GsonFactory.getGson().toJson(item)).thenReturn(item))
.onErrorContinue((exception, errorConsumer) -> log.error("Exception while processing", exception));
}