我已经将ConcurrentMessageListenerContainer
配置为3并发消耗3个分区,还有KafkaTemplate
和producerFactory生成消息到3个分区。 Spring bean配置了destroy-method,用于在应用程序关闭时调用使用者侦听器容器的stop()
和生产者。关闭后,下面显示的日志看起来像消费者已经停止,但是如果生产者停止了则没有信息。
[kafkaContainer-0-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer - 消费者停止 [kafkaContainer-2-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer - 消费者停止 [kafkaContainer-1-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer - 消费者停止
但应用程序没有完全关闭。执行jstack命令显示3个ThreadPoolTaskScheduler仍然在后台运行。 jstack命令的输出片段:
“ThreadPoolTaskScheduler-1”#74 prio = 5 os_prio = 0 tid = 0x00007f8564f23800 nid = 0x77e5等待条件[0x00007f8525292000] java.lang.Thread.State:在sun.misc.Unsafe.park(本地方法)等待(停车) - 在java.util.concurrent.locks.AbstractQueuedSynchronizer上的java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)停车等待<0x00000000ec8f0808>(java.util.concurrent.locks.AbstractQueuedSynchronizer $ ConditionObject) $ ConditionObject.await(AbstractQueuedSynchronizer.java:2039)at java.util.concurrent.ScheduledThreadPoolExecutor $ DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)at java.util.concurrent.ScheduledThreadPoolExecutor $ DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)at at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java: 617)在java.lang.Thread .RUN(Thread.java:745)
为每个MessageListener打印上面的块,我的意思是如果我将并发设置为5,那么jstack输出包含上述消息5次。所以我认为即使将消费者监听器记录为关机,它也不会完全关闭。
我错过了正确关闭生产者和消费者的事情吗?
您没有说明您使用的是哪个版本。
这在2.1.0,2.0.2和1.3.2中得到修复。
你不需要从stop()
方法destroy()
容器;上下文将在消费者关闭时停止。