我们正在将 spring boot 2.2.6.release 升级到 2.7.8 并在 Kafka 侦听器上面临以下问题。
之前我们使用 SCS 3.0.4.Release 并使用@StreamListener 来消费 Kafka 消息。我们有以下一种情况,其中一个 Kafka 通道 TEST_CHANNEL 被 2 个不同的 @StreamListener 使用。当任何消息发布到 TEST_TOPIC 时,监听器都会收到每条消息的副本并对其进行处理。但我的理解是由于单个消费者组TEST_TOPIC_GROUP消息应该被每个听众以循环方式消费,一次只有一个听众消费消息并处理它。下面是我在我的项目中所做的示例配置。
@Component
public class TestListener1 {
@StreamListener("TEST_CHANNEL")
public void handle(final Message<String> message) {
log.info("execute Test Listener 1" );
}
}
@Component
public class TestListener2 {
@StreamListener("TEST_CHANNEL")
public void handle(final Message<String> message) {
log.info("execute Test Listener 2" );
}
}
配置:
spring:
cloud:
stream:
bindings:
TEST_CHANNEL:
binder: kafka
content-type: application/json
destination: TEST_TOPIC
group: TEST_TOPIC_GROUP
升级到 SCS 3.2.6 后,@StreamListener 已弃用,现在我们将使用 Consumer 接口,我们的 Kafka 监听器将实现 Consumer 接口。这里我们有 2 个不同的频道(TEST_CHANNEL_1,TEST_CHANNEL_2),具有相同的目的地(TEST_TOPIC)和消费者组(TEST_TOPIC_GROUP)。在这种情况下,两个侦听器都将收到消息,但消息是以循环方式在两个侦听器之间分发的。如场景 1 中所述,每个消息副本都不会被两个侦听器使用和处理。
@Component("TEST_CHANNEL_1")
public class TestListener1 implements Consumer<Message<String>> {
@Override
public void accept(final Message<String> message) {
log.info("execute Test Listener 1" );
}
}
@Component("TEST_CHANNEL_2")
public class TestListener2 implements Consumer<Message<String>> {
@Override
public void accept(final Message<String> message) {
log.info("execute Test Listener 2" );
}
}
配置:
spring:
cloud:
stream:
bindings:
TEST_CHANNEL_1:
binder: kafka
content-type: application/json
destination: TEST_TOPIC
group: TEST_TOPIC_GROUP
TEST_CHANNEL_2:
binder: kafka
content-type: application/json
destination: TEST_TOPIC
group: TEST_TOPIC_GROUP
是否有人更早遇到过这个问题,请提出建议?