我想使用Spring Cloud Stream生成发给Kafka的带键(带有特定键的消息)消息。
@SpringBootApplication
public class SpringCloudStreamKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamKafkaApplication.class, args);
}
@Bean
Supplier<DataRecord> process(){
return () -> new DataRecord(42L);
}
}
我需要更改供应商代码以提供密钥吗?是否可以使用API的新样式(使用Lambda)?
谢谢
返回Message<?>
并设置KafkaHeaders.MESSAGE_KEY
标题:
@Bean
Supplier<Message<String>> process() {
return () -> MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
.build();
}
(假定默认密钥序列化程序(字节[])。
编辑
这将被无休止地调用。
如果您想发送有限的流,我相信您必须切换到反应模型。
@Bean
Supplier<Flux<Message<String>>> processFinite() {
Message<String> msg1 = MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
.build();
Message<String> msg2 = MessageBuilder.withPayload("baz")
.setHeader(KafkaHeaders.MESSAGE_KEY, "qux".getBytes())
.build();
return () -> {
return Flux.just(msg1, msg2);
};
}
还有Flux.fromStream(myStream)
。
将在流的结尾处结束。