如何使用Spring Cloud Stream Supplier将密钥消息发送到Kafka

问题描述 投票:1回答:1

我想使用Spring Cloud Stream生成发给Kafka的带键(带有特定键的消息)消息。

@SpringBootApplication
public class SpringCloudStreamKafkaApplication {

  public static void main(String[] args) {
    SpringApplication.run(SpringCloudStreamKafkaApplication.class, args);
  }

  @Bean
  Supplier<DataRecord> process(){
    return () -> new DataRecord(42L);
  }

}

我需要更改供应商代码以提供密钥吗?是否可以使用API​​的新样式(使用Lambda)?

谢谢

spring-boot spring-kafka spring-cloud-stream
1个回答
0
投票

返回Message<?>并设置KafkaHeaders.MESSAGE_KEY标题:

@Bean
Supplier<Message<String>> process() {
    return () -> MessageBuilder.withPayload("foo")
            .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
            .build();
}

(假定默认密钥序列化程序(字节[])。

编辑

这将被无休止地调用。

如果您想发送有限的流,我相信您必须切换到反应模型。

@Bean
Supplier<Flux<Message<String>>> processFinite() {
    Message<String> msg1 = MessageBuilder.withPayload("foo")
            .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
            .build();
    Message<String> msg2 = MessageBuilder.withPayload("baz")
            .setHeader(KafkaHeaders.MESSAGE_KEY, "qux".getBytes())
            .build();
    return () -> {
        return Flux.just(msg1, msg2);
    };
}

还有Flux.fromStream(myStream)

将在流的结尾处结束。

© www.soinside.com 2019 - 2024. All rights reserved.