我正在尝试将具有相同消息负载类型的两个单独的 Kafka 流映射到两个不同的
Consumer
beans,这两个 bean 都使用 Spring Cloud Stream 使用相同的服务实现类。
我的代码/配置类似于以下内容:
@Bean
public Consumer<MyPayload> myPayloadConsumerFromTopic1(MyPayloadProcessor myPayloadProcessor) {
return myPayloadProcessor::processMyPayload;
}
@Bean
public Consumer<MyPayload> myPayloadConsumerFromTopic2(MyPayloadProcessor myPayloadProcessor) {
return myPayloadProcessor::processMyPayload;
}
spring:
cloud:
function:
definition: myPayloadConsumerFromTopic1,myPayloadConsumerFromTopic2
stream:
bindings:
myPayloadConsumerFromTopic1-in-0:
destination: topic-1
group: myPayloadConsumerFromTopic1Group
...
myPayloadConsumerFromTopic2-in-0:
destination: topic-2
group: myPayloadConsumerFromTopic2Group
...
但是,当我的微服务启动时,我收到以下错误:
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'my-payload-processor-1.myPayloadConsumerFromTopic1myPayloadConsumerFromTopic2-in-0'
这个错误的原因是什么?我是否可以将两个包含相同有效负载类型的单独 Kafka 主题映射到具有上面列出的代码/配置的 Spring Cloud Stream 微服务中?如果不是,我需要什么额外的代码和/或配置来确保我可以做到这一点?
对于任何可能遇到同样问题并正在寻找解决方法的人,这里是解决方法。
spring:
cloud:
function:
definition: myPayloadConsumerFromTopic1;myPayloadConsumerFromTopic2
请注意,以前我使用逗号来分隔函数定义,而现在我使用分号。这解决了这个问题。