MqttGateway
MqttConfig
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(String data);
}
package com.sz.mqtt.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.converter.*;
import java.util.concurrent.Executors;
@Configuration
@IntegrationComponentScan("com.sz.mqtt.config")
@Slf4j
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttConfig {
private String clientIdInbound;
private String clientIdOutbound;
private String url;
private String password;
private String username;
@Bean
public MqttConnectionOptions mqttConnectOptions(){
MqttConnectionOptions options = new MqttConnectionOptions();
options.setServerURIs(new String[] { url});
options.setUserName(username);
options.setPassword(password.getBytes());
options.setAutomaticReconnect(true);
return options;
}
@Bean
public SimpleMessageConverter simpleMessageConverter(){
return new SimpleMessageConverter();
}
@Bean
public MessageHandler mqttOutboundHandler(MqttConnectionOptions connectionOptions) {
Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(connectionOptions,clientIdOutbound);
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("defaultTopic");
messageHandler.setDefaultQos(0);
messageHandler.setConverter(simpleMessageConverter());
return messageHandler;
}
@Bean
public IntegrationFlow mqttOutboundFlow(MessageHandler mqttOutboundHandler){
return IntegrationFlow.from("mqttOutboundChannel")
.channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
.handle(mqttOutboundHandler)
.get();
}
}
我在我的Springboot模块(另一个Maven模块)中依靠上面的sz-common-mqtt
<dependency>
<groupId>com.sz</groupId>
<artifactId>sz-common-mqtt</artifactId>
<version>${revision}</version>
</dependency>
然后我直接使用Spring@Component+Lombok直接注入MQTTGATEWAY组件
@Component
@Slf4j
@RequiredArgsConstructor
public class UnitClientManager {
private final Map<Long, UnitSession> SESSION_MAP = new ConcurrentHashMap<>();
private final MqttGateway mqttGateway;
private final IntegrationFlowContext integrationFlowContext;
.......... other info
}
当我启动Springboot时,我会收到以下错误
***************************
APPLICATION FAILED TO START
***************************
Description:
Parameter 0 of constructor in sz.device.session.UnitClientManager required a bean of type 'com.sz.mqtt.config.MqttGateway' that could not be found.
Action:
Consider defining a bean of type 'com.sz.mqtt.config.MqttGateway' in your configuration.
如何解决上述问题以实现自动组装MQTTGATEWAY组件的目的?
即将运行您的申请。
@IntegrationComponentScan
按预期工作,但是对于我而言,它仍然失败了:
A component required a bean named 'mqttOutboundChannel' that could not be found.
Action:
Consider defining a bean named 'mqttOutboundChannel' in your configuration.
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
。
看起来像您声明流程中的一个按需:
@Bean
public IntegrationFlow mqttOutboundFlow(MessageHandler mqttOutboundHandler){
return IntegrationFlow.from("mqttOutboundChannel")
对于扫描和注射阶段,这确实不可见。
因此,我们必须对所需的频道对象进行明确的介绍:
@Bean
DirectChannel mqttOutboundChannel() {
return new DirectChannel();
}
使它运行良好。 像您的真实应用程序一样,feel会处理不同的软件包。 在此样本中,您有:
package com.sz;
@SpringBootApplication
public class MainApplication {
为春季启动中使用的
@ComponentScan
做了哪些技巧。 因此,它从那
com.sz
com.sz.mqtt.config
,甚至能够从该依赖项模块扫描到
@IntegrationComponentScan
MqttGateway
。这样,它就能触发
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.mqttOutboundChannel'.
可以看到要注册为代理bean的接口。
添加频道bean后,我继续进入您的配置的下一个错误:
@PostConstruct
public void init(){
sendToMqtt("test","hello");
}
这是因为您这样做:
@EventListener(ApplicationReadyEvent.class)
public void readyToSend() {
sendToMqtt("test","hello");
}
这是不正确的。我们只是不从应用程序初始化阶段与外部系统互动。当我对此更改时:
tcp://127.0.0.1:1883
i能够传递初始化阶段。是的,这对我来说仍然失败了,但这是可以预料的,因为我没有MQTT经纪人::
Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused: getsockopt
at org.eclipse.paho.mqttv5.client.internal.TCPNetworkModule.start(TCPNetworkModule.java:81)
at org.eclipse.paho.mqttv5.client.internal.ClientComms$ConnectBG.run(ClientComms.java:783)
... 1 more
Caused by: java.net.ConnectException: Connection refused: getsockopt
到目前为止,我所有人都很好。
请更新示例以查看我们这一边的错误。