Spring Kafka没有正常关闭

问题描述 投票:2回答:1

我已经将ConcurrentMessageListenerContainer配置为3并发消耗3个分区,还有KafkaTemplate和producerFactory生成消息到3个分区。 Spring bean配置了destroy-method,用于在应用程序关闭时调用使用者侦听器容器的stop()和生产者。关闭后,下面显示的日志看起来像消费者已经停止,但是如果生产者停止了则没有信息。

[kafkaContainer-0-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer - 消费者停止 [kafkaContainer-2-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer - 消费者停止 [kafkaContainer-1-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer - 消费者停止

但应用程序没有完全关闭。执行jstack命令显示3个ThreadPoolTask​​Scheduler仍然在后台运行。 jstack命令的输出片段:

“ThreadPoolTask​​Scheduler-1”#74 prio = 5 os_prio = 0 tid = 0x00007f8564f23800 nid = 0x77e5等待条件[0x00007f8525292000] java.lang.Thread.State:在sun.misc.Unsafe.park(本地方法)等待(停车) - 在java.util.concurrent.locks.AbstractQueuedSynchronizer上的java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)停车等待<0x00000000ec8f0808>(java.util.concurrent.locks.AbstractQueuedSynchronizer $ ConditionObject) $ ConditionObject.await(AbstractQueuedSynchronizer.java:2039)at java.util.concurrent.ScheduledThreadPoolExecutor $ DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)at java.util.concurrent.ScheduledThreadPoolExecutor $ DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)at at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java: 617)在java.lang.Thread .RUN(Thread.java:745)

为每个MessageListener打印上面的块,我的意思是如果我将并发设置为5,那么jstack输出包含上述消息5次。所以我认为即使将消费者监听器记录为关机,它也不会完全关闭。

我错过了正确关闭生产者和消费者的事情吗?

apache-kafka spring-kafka
1个回答
1
投票

您没有说明您使用的是哪个版本。

这在2.1.0,2.0.2和1.3.2中得到修复。

你不需要从stop()方法destroy()容器;上下文将在消费者关闭时停止。

© www.soinside.com 2019 - 2024. All rights reserved.