在使用
auto.offset.reset
作为扫描启动模式时,我无法将 group-offsets
设置为最新。我已经尝试使用属性。* 正如文档中提到的 - https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#start-reading-position,但仍然使用 auto.offset.reset
=none 创建的 Kafka 消费者组(在 Flink 日志中验证)并且作业失败并出现错误:Undefined offset with no reset policy for partitions: ....
CREATE TABLE test (id int, order_time timestamp(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND )
WITH (
'connector' = 'kafka',
'topic' = 'test_topic',
'properties.group.id' = 'testGroup',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro-confluent',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'latest',
'avro-confluent.url' = 'http://localhost:8081'
)
Flink 版本:1.14.0。
堆栈跟踪:
Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test_topic-11]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:683)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2420)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1750)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1709)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:375)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:260)
at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 more
我错过了什么吗?
您在定义中定义了
properties.auto.offset.reset
除此之外,我想知道您是否也患有https://issues.apache.org/jira/browse/FLINK-24697
编辑:我读错了,
properties
键被转发。但我想知道如果您将该值设置为 latest
同时为 group-offsets
指定 scan.startup.mode
,您会期望什么。
自 Flink 1.15+ 起,内部 KafkaConsumer 已被删除,用户应该使用 较新的 Flink 版本,至少 Flink 1.15 以上。
注意:从 Flink 1.15 开始,Flink 默认使用 AdminClient 来寻找分区的位置。仅当启用检查点时,一旦触发检查点,偏移量就会提交给消费者组,顺便说一句,您的消费者组将出现在您的kafka中。请参阅我的研究:Flink-36674
switch (startupMode) {
case EARLIEST:
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
break;
case LATEST:
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
break;
case GROUP_OFFSETS:
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets());
break;
...
}
...
当您将
scan.startup.mode
设置为 group-offsets
时,KafkaSourceBuilder
会使用 startingOffsets
将 CommitedOffsets
设置为 offsetResetStrategy NONE
。
static OffsetsInitializer committedOffsets() {
return committedOffsets(OffsetResetStrategy.NONE);
}
flink Kafka启动模式选项
scan.startup.mode
最初由FLINK-22914创建
据我所知,scan.startup.mode 用于告诉 KafkaSourceEnumerator 在您的消费者组中定位提交的偏移量。
提交的偏移量必须存在,否则 flink kafka 源会失败,并且没有偏移量更改以保持消费者偏移量干净。