Spring Batch - Kafka:KafkaItemReader 始终从头开始读取数据

问题描述 投票:0回答:5

我愿意使用Spring Batch进行Kafka数据消费。这个 spring-tips 链接有一个基本示例。

这是我的

reader

  @Bean
  KafkaItemReader<String, String> kafkaItemReader() {
    var props = new Properties();
    props.putAll(this.properties.buildConsumerProperties());

    return new KafkaItemReaderBuilder<String, String>()
        .partitions(0)
        .consumerProperties(props)
        .name("customers-reader")
        .saveState(true)
        .topic("test-consumer")
        .build();
  }

我的

application.properties
文件:

 spring:
    kafka:
      consumer:
        bootstrap-servers: localhost:9092
        group-id: groupid-Dev
        enable-auto-commit: false
        auto-offset-reset: latest
        auto.commit.interval.ms: 1000
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserialize

问题:

  • 每次启动作业时,它都会寻找第 0 个偏移量。所以,我从一开始就收到消息。这是一个错误吗?
  • 为什么我们需要手动提供分区来读取?将来会发生什么变化,不会影响我的代码吗?
java spring apache-kafka spring-batch spring-kafka
5个回答
2
投票

每次启动作业时,它都会寻找第 0 个偏移量。所以,我从一开始就收到消息。这是一个错误吗?

不,这是一个功能(认真的):-) 选择让kafka项读取器从分区的开头读取是为了使其与其他读取器保持一致(它们都是从数据源的开头开始)。但在 Kafka 的世界中,偏移量是一阶概念,我们将使起始偏移量可配置(为此我们有一个 PR)。这将在即将推出的 v4.3 中发布计划于 2020 年 10 月

为什么我们需要手动提供分区来读取?

因为 Spring Batch 无法决定从给定主题名称的哪个分区读取。我们在此欢迎有关合理默认值的建议。


2
投票

正如 @Mahmoud 在此 answer 中所解释的,偏移量存储在两个位置:在 kafka 端,带有

kafkaConsumer.commitSync()
,在 Spring Batch 端,带有
executionContext.put(partition, offset)

例如,要使读取器从Kafka中存储的偏移量开始,您需要将一个空的HashMap设置为partitionOffset,如下所示:

KafkaItemReader kafkaItemReader = new KafkaItemReaderBuilder<String, byte[]>()
                    .partitions(0)
                    .consumerProperties(props)
                    .pollTimeout(Duration.of(305000, ChronoUnit.MILLIS))
                    .name(KAFKA_CONSUMER_NAME)
                    .saveState(true)
                    .topic(topicName)
                    .build();
kafkaItemReader.setPartitionOffsets(new HashMap<>());

对于其他情况,例如从 ExecutionContext 读取偏移量,您可以检查 KafkaItemReaderTests 类中完成的示例。


0
投票

第一个问题的答案:

您设置

enable-auto-commit: false
。在这种情况下,您必须手动提交偏移量,或者您可以将
enable-auto-commit
设置为 true。否则,因为您不提交偏移量,所以您当前的偏移量将始终为零。

回答第二个问题:

您不必手动提供要读取的分区。你只需设置topic来订阅,那么Kafka就会将该topic的分区均匀分配给同一个consumer-group中的consumer。


0
投票

尝试在 read 方法之外定义 reader 对象引用。这个问题我以前遇到过,这就是解决方案

示例代码:

private KafkaItemReader<String, IRecord> kafkaItemReader;
@Autowired
public EmpolyeeBusinessItemReader(KafkaProperties properties) {
    this.kafkaProperties = properties;
}

@Override
public IRecord read() {
    String methodName = "readEmployeeBusiness";
    LOGGER.traceEntry(methodName);
    if (kafkaItemReader==null) {
        Properties consumerProperties = preparekafkaReader(this.kafkaProperties);
        kafkaItemReader = new KafkaItemReaderBuilder<String, IRecord>()
                .partitions(1)
                .consumerProperties(consumerProperties)
                .name("employee-business-reader")
                .saveState(true)
                .topic(KafkaConfigurationNames.KafkaTopic.EMP_BUSINESS_PROCESS_TOPIC)
                .build();

        kafkaItemReader.open(new ExecutionContext());
    }

    return kafkaItemReader.read();

}

0
投票

每次启动作业时,它都会寻找第 0 个偏移量。所以,我从一开始就收到消息。这是一个错误吗?

自2020年8月起,您可以在构建器上使用partitionOffsets来告诉读者它应该从Kafka中存储的消费者组ID的偏移量开始读取

return new KafkaItemReaderBuilder<String, String>()
    .partitions(0)
    .consumerProperties(props)
    .name("customers-reader")
    .saveState(true)
    .topic("test-consumer")
    .partitionOffsets(new HashMap<>()) // <--- here
    .build();

请参阅此链接

© www.soinside.com 2019 - 2024. All rights reserved.