enable.auto.commit=false
),我发现,通过这种设置,即使我生成消费者 Kafka 的多个实例,一次也只允许连接一个消费者.该主题有一个分区,并且消费者是使用以下配置创建的。我的结论是否正确,即通过手动偏移提交行为,即使
consumer group
Kafka 中存在多个实例,每个主题也只允许一个消费者?或者这种行为是由于属性中存在
consumer group id
而表现出来的(由于 Kafka 只允许每个分区一个消费者)?
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapServer);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"2");
return new KafkaConsumer<>(properties);
在 Apache Kafka 中为什么不能有比分区更多的消费者实例?)。当您启动 Kafka Consumer 时,您需要加入一个消费者组。不这样做会导致这样的错误:
Caused by: org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
注意:如果您使用 Smallrye 和 Quarkus,当消费者数量大于分区数量时,您会观察到一些奇怪的行为。奇怪的行为是,您会看到,当额外的消费者根据分区分配策略加入组时,您可能有一个现有的消费者对该分区失去了订阅,并且该分区可以转给新的消费者。当发生这种情况时,您将观察到旧的消费者(已丢失分区)仍在处理事件/消息。这些消息是 Smallrye 已经轮询并保存在内部队列中的消息,该队列将继续推送到应用程序(用
@Incoming
注释的方法)。这会导致消息的双重处理,一开始可能会给人一种印象,即 SmallRye 的行为与本机 Kafka 消费者不同。向 Smallrye 社区提出的问题:
https://github.com/smallrye/smallrye-reactive-messaging/discussions/2445