我确实有一个Song.Class
(song_id, song_name, song_duration
)。....
package org.example.demo.model;
public class Flux {
private int user_id;
private int song_id;
private float listening_duration;
public Flux(int user_id, int song_id, float listening_duration) {
this.user_id = user_id;
this.song_id = song_id;
this.listening_duration = listening_duration;
}
...
我确实有第一个程序将一些事件发送到kafka主题(在Avro中序列化:):>
props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", KafkaAvroSerializer.class.getName()); props.put("schema.registry.url","http://172.17.0.8:8081"); KafkaProducer<String, Flux> kafkaProducerFlux = new KafkaProducer<String, Flux>(props);
进行循环:
Flux flux = Flux.newBuilder() .setSongId(arraysong.get(s).getSongId()) .setUserId(arrayuser.get(u).getUserId()) .setListeningDuration(arraysong.get(s).getSongDuration()) .build(); kafkaProducerFlux.send(new ProducerRecord<String, Flux>(topic_events, flux));
....
现在,我想以流形式取回我的对象:
KStream<String, Flux> source = builder.stream("music_flux"); KStream<String, GenericRecord> source = builder.stream("music_flux");
希望能够将
user_id
与来自其他流的输入一起加入。
感谢您的帮助。
我确实有一个Song.Class(song_id,song_name,song_duration)。 ....包org.example.demo.model;公共类Flux {private int user_id; private int song_id;私有浮动监听持续时间; ...
您必须在kafka流属性中配置键和值反序列化器。由于Flux模式是通过您的Kafka生产者在模式注册表中编写的,因此Kstream应用程序将从那里读取模式。您需要配置“ SCHEMA_REGISTRY_URL_CONFIG