我想使用Helidon SE 4.1.6并使用Producer。
详细信息: 我已经浏览了https://helidon.io/docs/latest/se/reactive-messaging#_kafka_connector页面和下面的代码书写,可以使用KAFKA生产者将数据发送到主题。
现在我想使用Helidon SE4.1.6.将数据发送到Kafka的特定分区。
public class KafkaProducer {
public static void init() {
String kafkaServer ="localhost:9092";
String topic = "topic2";
Channel<String> toKafka = Channel.<String>builder()
.subscriberConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.topic(topic)
.keySerializer(StringSerializer.class)
.valueSerializer(StringSerializer.class)
.build())
.build();
KafkaConnector kafkaConnector = KafkaConnector.create();
System.out.println("In producer: ");
Messaging messaging = Messaging.builder()
.publisher(toKafka, Multi.just("Test1", "Test2").map(Message::of))
.connector(kafkaConnector)
.build()
.start();
}
}
i在下面的代码下尝试了以下代码,我正在使用特定分区创建生产者记录,以便我可以转到指定的分区。
但是我看不到数据发送特定分区。
为了验证数据,我在主题上运行消费者。
public class KafkaProducerPar {
public static void init() {
String bootstrapServers = "localhost:9092"; // Replace with your Kafka broker(s)
String topic = "topic3";
// Create a KafkaConnector
Config config = Config.create();
KafkaConnector kafkaConnector = KafkaConnector.create(config);
// Create a Channel for producing messages
Channel<ProducerRecord<String, String>> toKafka = Channel.<ProducerRecord<String, String>>builder()
.subscriberConfig(KafkaConnector.configBuilder()
.bootstrapServers(bootstrapServers)
.topic(topic)
.keySerializer(StringSerializer.class)
.valueSerializer(StringSerializer.class)
.build())
.build();
// Create a Messaging instance
Messaging messaging = Messaging.builder()
.publisher(toKafka,createMessageStream1(topic) )
.connector(kafkaConnector)
.build()
.start();
Messaging messaging2 = Messaging.builder()
.publisher(toKafka, Multi.just(
Message.of(new ProducerRecord<>(topic, 0, "key1", "Message for partition 0")),
Message.of(new ProducerRecord<>(topic, 1, "key2", "Message for partition 1"))
))
.connector(kafkaConnector)
.build()
.start();
}
}
您可以使用此处所述的自定义分区器:
Channel<String> toKafka = Channel.<String>builder()
.subscriberConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.topic(topic)
.property("partitioner.class", YourCustomPartitioner.class)