我是 Apache Flink 的新手。我尝试使用 Flink 的 KafkaSource 从 Apache Kafka 获取事件。到目前为止一切顺利,看起来效果很好。重新启动 flink 任务后,尽管我设置了 GroupId,但我再次收到相同的消息。
这是我从 Kafka 读取的代码片段:
KafkaSource<BestellungEvent> bestellungEventSource = KafkaSource.<BestellungEvent>builder()
.setBootstrapServers("localhost:9092")
.setTopics("bestellungen")
.setGroupId("bla2")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new BestellungEventKeyValueDeserializationSchema()))
.build();
有人可以帮我解决这个问题吗?
致以诚挚的问候
托马斯
您正在使用
.setStartingOffsets(OffsetsInitializer.earliest())
,它从最早的偏移量开始。您应该使用 OffsetsInitializer.committedOffsets()
从组的最后一个偏移量开始消费。您可以在 documentation 中阅读有关各种选项的更多信息(我不确定我是否链接到正确版本的文档,因为您没有提到您正在使用哪个版本的 Flink Kafka Connector。)