在Helidon SE 4.1.6中,如何使用Kafka生产商将数据发送到特定分区

问题描述 投票:0回答:1

我想使用Helidon SE 4.1.6并使用Producer。

详细信息: 我已经浏览了https://helidon.io/docs/latest/se/reactive-messaging#_kafka_connector页面和下面的代码书写,可以使用KAFKA生产者将数据发送到主题。

现在我想使用Helidon SE4.1.6.将数据发送到Kafka的特定分区。

public class KafkaProducer { public static void init() { String kafkaServer ="localhost:9092"; String topic = "topic2"; Channel<String> toKafka = Channel.<String>builder() .subscriberConfig(KafkaConnector.configBuilder() .bootstrapServers(kafkaServer) .topic(topic) .keySerializer(StringSerializer.class) .valueSerializer(StringSerializer.class) .build()) .build(); KafkaConnector kafkaConnector = KafkaConnector.create(); System.out.println("In producer: "); Messaging messaging = Messaging.builder() .publisher(toKafka, Multi.just("Test1", "Test2").map(Message::of)) .connector(kafkaConnector) .build() .start(); } }
i在下面的代码下尝试了以下代码,我正在使用特定分区创建生产者记录,以便我可以转到指定的分区。
但是我看不到数据发送特定分区。

为了验证数据,我在主题上运行消费者。

public class KafkaProducerPar { public static void init() { String bootstrapServers = "localhost:9092"; // Replace with your Kafka broker(s) String topic = "topic3"; // Create a KafkaConnector Config config = Config.create(); KafkaConnector kafkaConnector = KafkaConnector.create(config); // Create a Channel for producing messages Channel<ProducerRecord<String, String>> toKafka = Channel.<ProducerRecord<String, String>>builder() .subscriberConfig(KafkaConnector.configBuilder() .bootstrapServers(bootstrapServers) .topic(topic) .keySerializer(StringSerializer.class) .valueSerializer(StringSerializer.class) .build()) .build(); // Create a Messaging instance Messaging messaging = Messaging.builder() .publisher(toKafka,createMessageStream1(topic) ) .connector(kafkaConnector) .build() .start(); Messaging messaging2 = Messaging.builder() .publisher(toKafka, Multi.just( Message.of(new ProducerRecord<>(topic, 0, "key1", "Message for partition 0")), Message.of(new ProducerRecord<>(topic, 1, "key2", "Message for partition 1")) )) .connector(kafkaConnector) .build() .start(); } }

您可以使用此处所述的自定义分区器:
apache-kafka apache-kafka-streams apache-kafka-connect kafka-producer-api helidon
1个回答
0
投票

以这种方式进行注册:

Channel<String> toKafka = Channel.<String>builder() .subscriberConfig(KafkaConnector.configBuilder() .bootstrapServers(kafkaServer) .topic(topic) .property("partitioner.class", YourCustomPartitioner.class)


最新问题
© www.soinside.com 2019 - 2025. All rights reserved.