Flink Kafka GroupId 在使用 KafkaSource 时似乎被忽略了

问题描述 投票:0回答:1

我是 Apache Flink 的新手。我尝试使用 Flink 的 KafkaSource 从 Apache Kafka 获取事件。到目前为止一切顺利,看起来效果很好。重新启动 flink 任务后,尽管我设置了 GroupId,但我再次收到相同的消息。

这是我从 Kafka 读取的代码片段:

KafkaSource<BestellungEvent> bestellungEventSource = KafkaSource.<BestellungEvent>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("bestellungen")
                .setGroupId("bla2")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.of(new BestellungEventKeyValueDeserializationSchema()))
                .build(); 

有人可以帮我解决这个问题吗?

致以诚挚的问候

托马斯

apache-kafka apache-flink consumer
1个回答
0
投票

您正在使用

.setStartingOffsets(OffsetsInitializer.earliest())
,它从最早的偏移量开始。您应该使用
OffsetsInitializer.committedOffsets()
从组的最后一个偏移量开始消费。您可以在 documentation 中阅读有关各种选项的更多信息(我不确定我是否链接到正确版本的文档,因为您没有提到您正在使用哪个版本的 Flink Kafka Connector。)

© www.soinside.com 2019 - 2024. All rights reserved.