我正在尝试使用Kafka消息密钥作为rabbitmq消息的路由密钥将消息从kafka的主题发送到多个rabbitMQ队列。 管道如下所示:kafka的主题 -> Spring Cloud Stream应用程序 ->rabbitmq
RabbitMQ 设置
兑换方式:直接兑换。所有队列都绑定到Exchange,路由键和队列名称相同。
SCS 配置
cloud:
stream:
default-binder: rabbit
function:
definition: upstreamProcessor
bindings:
upstreamProcessor-in-0:
destination: here is the kafka topic
group: ms-upstream-provider
binder: kafka
upstreamProcessor-out-0:
destination: here is the rabbitMQ exchange name
binder: rabbit
rabbit.bindings.upstreamProcessor-out-0.producer:
exchangeType: direct
使用upstreamProcessor函数我正在读取传入消息,创建新消息并发送到rabbitMQ。
但我不明白如何为每个传出消息添加路由密钥?我有很多不同的队列(> 6000),我想在代码中而不是在设置中设置路由键 在文档和网站上,我没有找到如何以编程方式设置路由键
预先感谢您的建议。
public Function<Message<String>, Message<JsonNode>> upstreamProcessor() {
return message -> {
try {
JsonNode node = mapper.readTree(message.getPayload());
String routingKey= message.getHeaders().get(KafkaHeaders.RECEIVED_KEY).toString();
return MessageBuilder.withPayload(node)
.setHeader("routingKey", routingKey)
.build();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
};
}
您可以按照您所做的那样在代码中设置路由键,即
.setHeader("routingKey," routingKey).
然后您需要通过配置在生产者绑定上设置 routingKeyExpression
。
spring.cloud.stream.rabbit.bindings.upstreamProcessor-out-0.producer.routing-key-expression=headers['routingKey']
。
这样,通过
upstreamProcessor-out-0
绑定进行的任何出站发布都将参考 routingKey
标头作为 AMQP 出站端点使用的路由键。请注意,标头名称可以是任何内容 - 不一定是 routingKey
。