我有一个使用 spring-integration-mqtt (v. 6.0.4) 的 spring boot (v. 3.0.5) 项目,并且还使用 paho mqttv5 客户端。我想通过
ClientManager
和集成 DSL 设置共享订阅。但我无法让它工作。
@Bean
fun clientManager(): ClientManager<IMqttAsyncClient, MqttConnectionOptions> {
val connectionOptions = MqttConnectionOptions()
connectionOptions.serverURIs = arrayOf("tcp://example.org:1883")
val clientManager = Mqttv5ClientManager(connectionOptions, "testClient")
clientManager.setPersistence(MqttDefaultFilePersistence())
return clientManager
}
@Bean
fun mqttTestInFlow(clientManager: ClientManager<IMqttAsyncClient, MqttConnectionOptions>): IntegrationFlow {
val messageProducer = Mqttv5PahoMessageDrivenChannelAdapter(
clientManager,
"\$share/testGroup/foo/test",
)
return IntegrationFlow.from(messageProducer)
.channel("mqttInputChannel")
.get()
}
@ServiceActivator(inputChannel = "mqttInputChannel")
fun handler(message: Message<String>) {
println("Received message: ${message.payload}")
}
我可以在 mosquitto 代理的日志中看到订阅已创建,并且发布到
foo/test
的消息也发布到 testClient
(Spring 服务)。但我的处理程序从未收到这些消息。
当我从主题字符串中删除 $share/testGroup
时,一切都会正常工作。
这已在最近的 Spring Integration 版本中修复:https://github.com/spring-projects/spring-integration/issues/8879