使用 Spring Integration Mqttv5 共享订阅未接收消息

问题描述 投票:0回答:1

我有一个使用 spring-integration-mqtt (v. 6.0.4) 的 spring boot (v. 3.0.5) 项目,并且还使用 paho mqttv5 客户端。我想通过

ClientManager
和集成 DSL 设置共享订阅。但我无法让它工作。

@Bean
fun clientManager(): ClientManager<IMqttAsyncClient, MqttConnectionOptions> {
    val connectionOptions = MqttConnectionOptions()
    connectionOptions.serverURIs = arrayOf("tcp://example.org:1883")
    
    val clientManager = Mqttv5ClientManager(connectionOptions, "testClient")
    clientManager.setPersistence(MqttDefaultFilePersistence())
    return clientManager
}

@Bean
fun mqttTestInFlow(clientManager: ClientManager<IMqttAsyncClient, MqttConnectionOptions>): IntegrationFlow {
    val messageProducer = Mqttv5PahoMessageDrivenChannelAdapter(
        clientManager,
        "\$share/testGroup/foo/test",
    )

    return IntegrationFlow.from(messageProducer)
        .channel("mqttInputChannel")
        .get()
}

@ServiceActivator(inputChannel = "mqttInputChannel")
fun handler(message: Message<String>) {
    println("Received message: ${message.payload}")
}

我可以在 mosquitto 代理的日志中看到订阅已创建,并且发布到

foo/test
的消息也发布到
testClient
(Spring 服务)。但我的处理程序从未收到这些消息。 当我从主题字符串中删除
$share/testGroup
时,一切都会正常工作。

spring-integration spring-integration-dsl spring-integration-mqtt
1个回答
0
投票

这已在最近的 Spring Integration 版本中修复:https://github.com/spring-projects/spring-integration/issues/8879

© www.soinside.com 2019 - 2024. All rights reserved.