Flink Kafka SQL 设置'auto.offset.reset'

问题描述 投票:0回答:2

在使用

auto.offset.reset
作为扫描启动模式时,我无法将
group-offsets
设置为最新。我已经尝试使用属性。* 正如文档中提到的 - https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#start-reading-position,但仍然使用
auto.offset.reset
=none 创建的 Kafka 消费者组(在 Flink 日志中验证)并且作业失败并出现错误:
Undefined offset with no reset policy for partitions: .... 

CREATE TABLE test (id int, order_time timestamp(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND )
WITH (
       'connector' = 'kafka',
       'topic' = 'test_topic',
       'properties.group.id' = 'testGroup',
       'properties.bootstrap.servers' = 'localhost:9092',
       'format' = 'avro-confluent',
       'scan.startup.mode' = 'group-offsets',
       'properties.auto.offset.reset' = 'latest',
       'avro-confluent.url' = 'http://localhost:8081'
  )

Flink 版本:1.14.0。

堆栈跟踪:

Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test_topic-11]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:683)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2420)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1750)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1709)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:375)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:260)
at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 more

我错过了什么吗?

apache-flink
2个回答
0
投票

您在定义中定义了

properties.auto.offset.reset
,但文档中不存在此类属性。如果删除它会发生什么?

除此之外,我想知道您是否也患有https://issues.apache.org/jira/browse/FLINK-24697

编辑:我读错了,

properties
键被转发。但我想知道如果您将该值设置为
latest
同时为
group-offsets
指定
scan.startup.mode
,您会期望什么。


0
投票

自 Flink 1.15+ 起,内部 KafkaConsumer 已被删除,用户应该使用 较新的 Flink 版本,至少 Flink 1.15 以上。

注意:从 Flink 1.15 开始,Flink 默认使用 AdminClient 来寻找分区的位置。仅当启用检查点时,一旦触发检查点,偏移量就会提交给消费者组,顺便说一句,您的消费者组将出现在您的kafka中。请参阅我的研究:Flink-36674

 switch (startupMode) {
        case EARLIEST:
            kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
            break;
        case LATEST:
            kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
            break;
        case GROUP_OFFSETS:
            kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets());
            break;
 ...

 }

...

当您将

scan.startup.mode
设置为
group-offsets
时,
KafkaSourceBuilder
会使用
startingOffsets
CommitedOffsets
设置为
offsetResetStrategy NONE

static OffsetsInitializer committedOffsets() {
    return committedOffsets(OffsetResetStrategy.NONE);
}

flink Kafka启动模式选项

scan.startup.mode
最初由FLINK-22914创建

据我所知,scan.startup.mode 用于告诉 KafkaSourceEnumerator 在您的消费者组中定位提交的偏移量。

提交的偏移量必须存在,否则 flink kafka 源会失败,并且没有偏移量更改以保持消费者偏移量干净。

© www.soinside.com 2019 - 2024. All rights reserved.