作为Spring Reactor的新手,我正在尝试使用Spring云流(使用rabbitMQ)来流数据。在将消息发送到队列之前,我需要添加一些自定义标头。
我的spring-cloud-stream的配置是:
spring:
cloud:
stream:
default:
producer:
errorChannelEnabled: true
bindings:
input:
binder: rabbitInput
destination: inputDestination
output:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
binders:
rabbitInput:
type: rabbit
environment:
spring:
rabbitmq:
port: 5672
host: localhost
rabbitOutput:
type: rabbit
environment:
spring:
rabbitmq:
port: 5670
host: localhost
生产商参考:
@SpringBootApplication
@EnableBinding(Processor.class)
public class MessageProcessor {
public static void main(String[] args) {
SpringApplication.run(MessageProcessor.class, args);
}
@Bean
Function<Flux<String>, Flux<String>> processMessage(List<String> students) {
return data -> data.map(d -> match(d, students));
}
private String match(String message, List<String> students){
return Objects.isNull(message) || message.isBlank()
? message
: String.valueOf(matchStudentName(message, students));
}
private Optional<String> matchStudentName(String message, List<String> students){
return students.stream()
.filter(name -> name.equals(message)).findFirst();
}
@Bean
Function<Flux<String>, Flux<Message<String>>> addHeaders() {
return data-> data.map(d-> MessageBuilder
.withPayload( d )
.setHeader("a", 1)
.setHeader("b", "999")
.build());
}
}
标头已成功添加到消息中,但是它在某个地方被覆盖并且没有传播到消费者。
有人可以分享他们对如何使用Spring Cloud Stream向消息中添加自定义标头的想法。
提前感谢!
[请升级到Hoxton.SR2,它将带来spring-cloud-stream 3.0.2.RELEASE。有一些更新,但是简而言之,您正在生成的消息及其中的标头应保留。
旁注:同样,由于增加了对多个输入/输出函数参数的支持,我们不得不更新函数的绑定名称约定。您可以阅读有关here的更多信息,但对您而言,这意味着您的配置需要快速更新,因为默认情况下不再使用input
和output
,因此您应该使用从函数名派生的名称
spring:
cloud:
stream:
bindings:
processMessageaddHeaders-in-0:
binder: rabbitInput
destination: inputDestination
processMessageaddHeaders-out-0:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
。 。 。或者您可以将派生的绑定名称映射到更具描述性的名称(例如input
,output
等),并改用该名称
spring:
cloud:
stream:
bindings:
input:
binder: rabbitInput
destination: inputDestination
output:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
bindings:
processMessageaddHeaders-in-0: input
processMessageaddHeaders-out-0: output