消费者组中的Apache Beam KafkaIO消费者正在读取相同的消息

问题描述 投票:0回答:1

我正在数据流中使用KafkaIO来读取一个主题的消息。我使用以下代码。

KafkaIO.<String, String>read()
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
                .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 8000).put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000)
                .build())
//                .commitOffsetsInFinalize()
                .withTopics(Collections.singletonList(topicNames))
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata();

我使用直接运行程序在本地运行数据流程序。一切正常。我并行运行同一程序的另一个实例,即另一个使用者。现在,我在管道的处理中看到重复的消息。

尽管我提供了使用者组ID,但是以相同的使用者组ID(同一程序的不同实例)启动另一个使用者,不应该处理由另一个使用者处理的相同元素吗?

如何使用数据流运行器呢?

google-cloud-dataflow apache-beam apache-beam-io apache-beam-pipeline apache-beam-kafkaio
1个回答
0
投票

我认为您设置的选项不能保证跨管道不重复传送消息。

  • ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG:这是flag for the Kafka consumer,不适用于Beam管道本身。似乎这是尽力而为的,也是周期性的,因此您可能仍会在多个管道中看到重复项。

  • withReadCommitted():这只是意味着Beam不会读取未提交的消息。同样,它不会阻止跨多个管道的重复。

请参阅here,以获取Beam光源用于确定Kafka光源的起点的协议。

为了保证不重复发送,您可能必须阅读不同的主题或不同的订阅。

© www.soinside.com 2019 - 2024. All rights reserved.