我的项目需要支持mqtt。我使用 RabbitMQ 作为代理。我开发了 Spring Boot 应用程序,并使用 Spring Integration MQTT。
@Configuration
public class MqttConfig {
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoClientFactory clientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{ "tcp://localhost:1883" });
options.setUserName("user_admin");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer", clientFactory(), "example_topic");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("MESSAGE: " + message.getPayload());
}
};
}
}
pom.xml
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.2.1</version>
</dependency>
我在不同的端口 8081 和 8082 运行同一应用程序的两个实例
java -jar target/mqtt_project-0.0.1-SNAPSHOT.jar --server.port=8081
java -jar target/mqtt_project-0.0.1-SNAPSHOT.jar --server.port=8082
我使用MQTTX Client Toolbox进行测试。 当我发送几条关于该主题的消息时
example_topic
它总是到达同一个端口 8082
MESSAGE: Hello World // port 8082
MESSAGE: Hello World // port 8082
MESSAGE: Hello World // port 8082
我如何实现 MQTT 共享订阅以允许客户端负载平衡,这意味着代理在特定主题的订阅客户端之间平均分配消息负载?
MESSAGE: Hello World // port 8081
MESSAGE: Hello World // port 8082
MESSAGE: Hello World // port 8081
MESSAGE: Hello World // port 8082
如果代理支持共享订阅,那么使用它们只需向客户端订阅的主题添加正确的前缀即可。
例如
$shared/group-identifier/example_topic
其中
group-identifier
是客户端集合的唯一ID