Rabbit Mq 在第一条消息后不使用消息

问题描述 投票:0回答:1

在队列中发布消息后,在监听端收到两次相同的消息。 在接收到第一条消息后,监听器停止进一步消费消息。在我重新启动侦听器应用程序后,它会在队列中选择重复的待处理消息。也将异常接收为 .

 SimpleMessageListenerContainer.run: 1251 | Consumer thread error, thread abort.
java.lang.NoClassDefFoundError: org/springframework/messaging/handler/invocation/MethodArgumentResolutionException
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy.isCauseFatal(ConditionalRejectingErrorHandler.java:182)
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy.isFatal(ConditionalRejectingErrorHandler.java:169)
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:104)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1383)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1667)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1442)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:958)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:908)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1279)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1185)
    at java.lang.Thread.run(Thread.java:750) 

>SimpleMessageListenerContainer.killOrRestart: 1398 | Stopping container from aborted consumer

@Service
@PropertySource("classpath:application.yml")
@Slf4j
public class XYZCON {

    @Autowired
    MediaFactory queueProcessingMediatorFactory;

    @RabbitListener(containerFactory="rabbitListenerContainerFactory", queues = "${rabbitmq.xyz.queue.name}")
    public void receiveMessage(MessageKeyVo messageId) {
        log.info("Received  queue message converted from Json: {}", messageId.toString());
        RequestVo messageRequest = queueProcessingMediatorFactory.executeMessageFinder(messageId);
    }
}

@Configuration
@EnableRabbit
public class RabbitMQConfig implements RabbitListenerConfigurer {
    @Value("${rabbitmq.host}")
    private String host;

    @Value("${rabbitmq.port}")
    private Integer port;

    @Value("${rabbitmq.username}")
    private String username;

    @Value("${rabbitmq.password}")
    private String password;

    @Value("${rabbitmq.concurrent.consumers}")
    private Integer concurrentConsumers;

    @Value("${rabbitmq.max.concurrent.consumers}")
    private Integer maxConcurrentConsumers;

    @Value("${rabbitmq.url}")
    private String url;

    @Value("${rabbitmq.address}")
    private String address;

    @Value("${rabbitmq.xyz.exchange.name}")
    private String xyzExchange;

    @Value("${rabbitmq.xyz.queue.name}")
    private String xyzQueue;


    @Bean
    Queue xyzMessageQueue() {
        return new Queue(xyzQueue, true, false, false);
    }

    @Bean
    TopicExchange xyzExchange() {
        return new TopicExchange(xyzExchange);
    }

    @Bean
    Binding emailMessageQueue(Queue xyzMessageQueue, TopicExchange xyzExchange) {
        return BindingBuilder.bind(xyzMessageQueue).to(xyzExchange).with(xyzQueue);
    }


    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost(host);
        cachingConnectionFactory.setPort(port);
        cachingConnectionFactory.setUsername(username);
        cachingConnectionFactory.setPassword(password);
        cachingConnectionFactory.setAddresses(address);
        cachingConnectionFactory.setUri(url);
        cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
        cachingConnectionFactory.setRequestedHeartBeat(10);

        return cachingConnectionFactory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(concurrentConsumers);
        factory.setMaxConcurrentConsumers(maxConcurrentConsumers);
        return factory;
    }


    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }

    @Bean
    public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(consumerJackson2MessageConverter());
        return factory;
    }

    @Override
    public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }
java spring rabbitmq amqp spring-amqp
1个回答
0
投票

java.lang.NoClassDefFoundError:org/springframework/messaging/handler/invocation/MethodArgumentResolutionException

您似乎在类路径中缺少

spring-messaging
(或不兼容的版本,或者 jar 以某种方式损坏)。

显示您的版本/依赖项。

© www.soinside.com 2019 - 2024. All rights reserved.