@IntegrationComponentscan注释无法扫描@MessageGateway组件

问题描述 投票:0回答:1
然后我在com.sz.mqtt.config软件包下定义了2个类,即

MqttGateway

MqttConfig

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    void sendToMqtt(String data);
}
package com.sz.mqtt.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.converter.*;
import java.util.concurrent.Executors;


@Configuration
@IntegrationComponentScan("com.sz.mqtt.config")
@Slf4j
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttConfig {
    private String clientIdInbound;
    private String clientIdOutbound;
    private String url;
    private String password;
    private String username;

    @Bean
    public MqttConnectionOptions mqttConnectOptions(){
        MqttConnectionOptions options = new MqttConnectionOptions();
        options.setServerURIs(new String[] { url});
        options.setUserName(username);
        options.setPassword(password.getBytes());
        options.setAutomaticReconnect(true);
        return options;
    }

    @Bean
    public SimpleMessageConverter simpleMessageConverter(){
        return new SimpleMessageConverter();
    }

    @Bean
    public MessageHandler mqttOutboundHandler(MqttConnectionOptions connectionOptions) {
        Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(connectionOptions,clientIdOutbound);
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("defaultTopic");
        messageHandler.setDefaultQos(0);
        messageHandler.setConverter(simpleMessageConverter());
        return messageHandler;
    }


    @Bean
    public IntegrationFlow mqttOutboundFlow(MessageHandler mqttOutboundHandler){
        return IntegrationFlow.from("mqttOutboundChannel")
                .channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
                .handle(mqttOutboundHandler)
                .get();
    }

}
我在我的Springboot模块(另一个Maven模块)中依靠上面的

sz-common-mqtt
        <dependency>
            <groupId>com.sz</groupId>
            <artifactId>sz-common-mqtt</artifactId>
            <version>${revision}</version>
        </dependency>
然后我直接使用Spring@Component+Lombok

直接注入MQTTGATEWAY组件

@Component
@Slf4j
@RequiredArgsConstructor
public class UnitClientManager {
    private final Map<Long, UnitSession> SESSION_MAP = new ConcurrentHashMap<>();
    private final MqttGateway mqttGateway;
    private final IntegrationFlowContext integrationFlowContext;
    ..........  other info
}

当我启动Springboot时,我会收到以下错误

***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 0 of constructor in sz.device.session.UnitClientManager required a bean of type 'com.sz.mqtt.config.MqttGateway' that could not be found.


Action:

Consider defining a bean of type 'com.sz.mqtt.config.MqttGateway' in your configuration.

如何解决上述问题以实现自动组装MQTTGATEWAY组件的目的?
	

即将运行您的申请。

@IntegrationComponentScan
按预期工作,但是对于我而言,它仍然失败了:

A component required a bean named 'mqttOutboundChannel' that could not be found. Action: Consider defining a bean named 'mqttOutboundChannel' in your configuration.

java spring spring-boot spring-integration
1个回答
0
投票
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")

看起来像您声明流程中的一个按需:

@Bean public IntegrationFlow mqttOutboundFlow(MessageHandler mqttOutboundHandler){ return IntegrationFlow.from("mqttOutboundChannel")
对于扫描和注射阶段,这确实不可见。

因此,我们必须对所需的频道对象进行明确的介绍:

@Bean 
DirectChannel mqttOutboundChannel() {
    return new DirectChannel();
}

使它运行良好。 像您的真实应用程序一样,feel会处理不同的软件包。 在此样本中,您有:

package com.sz;

@SpringBootApplication
public class MainApplication {

为春季启动中使用的

@ComponentScan

做了哪些技巧。 因此,它从那

com.sz
com.sz.mqtt.config

,甚至能够从该依赖项模块扫描到

@IntegrationComponentScan

MqttGateway
。这样,它就能触发
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.mqttOutboundChannel'.

可以看到要注册为代理bean的接口。

添加频道bean后,我继续进入您的配置的下一个错误:
@PostConstruct
public void init(){
    sendToMqtt("test","hello");
}
这是因为您这样做:
@EventListener(ApplicationReadyEvent.class)
public void readyToSend() {
    sendToMqtt("test","hello");
}
这是不正确的。我们只是不从应用程序初始化阶段与外部系统互动。
当我对此更改时:

tcp://127.0.0.1:1883

i能够传递初始化阶段。是的,这对我来说仍然失败了,但这是可以预料的,因为我没有MQTT经纪人:

Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused: getsockopt at org.eclipse.paho.mqttv5.client.internal.TCPNetworkModule.start(TCPNetworkModule.java:81) at org.eclipse.paho.mqttv5.client.internal.ClientComms$ConnectBG.run(ClientComms.java:783) ... 1 more Caused by: java.net.ConnectException: Connection refused: getsockopt
到目前为止,我所有人都很好。
请更新示例以查看我们这一边的错误。
    

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.