如何将我的流值映射到我的对象类

问题描述 投票:0回答:1

我确实有一个Song.Classsong_id, song_name, song_duration)。....

package org.example.demo.model;

public class Flux {
private int user_id;
private int song_id;
private float listening_duration;

public Flux(int user_id, int song_id, float listening_duration) {
    this.user_id = user_id;
    this.song_id = song_id;
    this.listening_duration = listening_duration;
}

...

我确实有第一个程序将一些事件发送到kafka主题(在Avro中序列化:):>

props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url","http://172.17.0.8:8081");
KafkaProducer<String, Flux> kafkaProducerFlux = new KafkaProducer<String, Flux>(props);

进行循环:

Flux flux = Flux.newBuilder()
.setSongId(arraysong.get(s).getSongId())
.setUserId(arrayuser.get(u).getUserId())
.setListeningDuration(arraysong.get(s).getSongDuration())
.build();

kafkaProducerFlux.send(new ProducerRecord<String, Flux>(topic_events, flux));

....

现在,我想以流形式取回我的对象​​:

KStream<String, Flux> source = builder.stream("music_flux");
KStream<String, GenericRecord> source = builder.stream("music_flux");

希望能够将user_id与来自其他流的输入一起加入。

感谢您的帮助。

我确实有一个Song.Class(s​​ong_id,song_name,song_duration)。 ....包org.example.demo.model;公共类Flux {private int user_id; private int song_id;私有浮动监听持续时间; ...

java apache-kafka stream
1个回答
0
投票

您必须在kafka流属性中配置键和值反序列化器。由于Flux模式是通过您的Kafka生产者在模式注册表中编写的,因此Kstream应用程序将从那里读取模式。您需要配置“ SCHEMA_REGISTRY_URL_CONFIG

© www.soinside.com 2019 - 2024. All rights reserved.