我愿意使用Spring Batch进行Kafka数据消费。这个 spring-tips 链接有一个基本示例。
这是我的
reader
:
@Bean
KafkaItemReader<String, String> kafkaItemReader() {
var props = new Properties();
props.putAll(this.properties.buildConsumerProperties());
return new KafkaItemReaderBuilder<String, String>()
.partitions(0)
.consumerProperties(props)
.name("customers-reader")
.saveState(true)
.topic("test-consumer")
.build();
}
我的
application.properties
文件:
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: groupid-Dev
enable-auto-commit: false
auto-offset-reset: latest
auto.commit.interval.ms: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserialize
问题:
每次启动作业时,它都会寻找第 0 个偏移量。所以,我从一开始就收到消息。这是一个错误吗?
不,这是一个功能(认真的):-) 选择让kafka项读取器从分区的开头读取是为了使其与其他读取器保持一致(它们都是从数据源的开头开始)。但在 Kafka 的世界中,偏移量是一阶概念,我们将使起始偏移量可配置(为此我们有一个 PR)。这将在即将推出的 v4.3 中发布计划于 2020 年 10 月。
为什么我们需要手动提供分区来读取?
因为 Spring Batch 无法决定从给定主题名称的哪个分区读取。我们在此欢迎有关合理默认值的建议。
正如 @Mahmoud 在此 answer 中所解释的,偏移量存储在两个位置:在 kafka 端,带有
kafkaConsumer.commitSync()
,在 Spring Batch 端,带有 executionContext.put(partition, offset)
。
例如,要使读取器从Kafka中存储的偏移量开始,您需要将一个空的HashMap设置为partitionOffset,如下所示:
KafkaItemReader kafkaItemReader = new KafkaItemReaderBuilder<String, byte[]>()
.partitions(0)
.consumerProperties(props)
.pollTimeout(Duration.of(305000, ChronoUnit.MILLIS))
.name(KAFKA_CONSUMER_NAME)
.saveState(true)
.topic(topicName)
.build();
kafkaItemReader.setPartitionOffsets(new HashMap<>());
对于其他情况,例如从 ExecutionContext 读取偏移量,您可以检查 KafkaItemReaderTests 类中完成的示例。
第一个问题的答案:
您设置
enable-auto-commit: false
。在这种情况下,您必须手动提交偏移量,或者您可以将 enable-auto-commit
设置为 true。否则,因为您不提交偏移量,所以您当前的偏移量将始终为零。
回答第二个问题:
您不必手动提供要读取的分区。你只需设置topic来订阅,那么Kafka就会将该topic的分区均匀分配给同一个consumer-group中的consumer。
尝试在 read 方法之外定义 reader 对象引用。这个问题我以前遇到过,这就是解决方案
示例代码:
private KafkaItemReader<String, IRecord> kafkaItemReader;
@Autowired
public EmpolyeeBusinessItemReader(KafkaProperties properties) {
this.kafkaProperties = properties;
}
@Override
public IRecord read() {
String methodName = "readEmployeeBusiness";
LOGGER.traceEntry(methodName);
if (kafkaItemReader==null) {
Properties consumerProperties = preparekafkaReader(this.kafkaProperties);
kafkaItemReader = new KafkaItemReaderBuilder<String, IRecord>()
.partitions(1)
.consumerProperties(consumerProperties)
.name("employee-business-reader")
.saveState(true)
.topic(KafkaConfigurationNames.KafkaTopic.EMP_BUSINESS_PROCESS_TOPIC)
.build();
kafkaItemReader.open(new ExecutionContext());
}
return kafkaItemReader.read();
}
每次启动作业时,它都会寻找第 0 个偏移量。所以,我从一开始就收到消息。这是一个错误吗?
自2020年8月起,您可以在构建器上使用partitionOffsets来告诉读者它应该从Kafka中存储的消费者组ID的偏移量开始读取
return new KafkaItemReaderBuilder<String, String>()
.partitions(0)
.consumerProperties(props)
.name("customers-reader")
.saveState(true)
.topic("test-consumer")
.partitionOffsets(new HashMap<>()) // <--- here
.build();