在队列中发布消息后,在监听端收到两次相同的消息。 在接收到第一条消息后,监听器停止进一步消费消息。在我重新启动侦听器应用程序后,它会在队列中选择重复的待处理消息。也将异常接收为 .
SimpleMessageListenerContainer.run: 1251 | Consumer thread error, thread abort.
java.lang.NoClassDefFoundError: org/springframework/messaging/handler/invocation/MethodArgumentResolutionException
at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy.isCauseFatal(ConditionalRejectingErrorHandler.java:182)
at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy.isFatal(ConditionalRejectingErrorHandler.java:169)
at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:104)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1383)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1667)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1442)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:958)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:908)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1279)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1185)
at java.lang.Thread.run(Thread.java:750)
>SimpleMessageListenerContainer.killOrRestart: 1398 | Stopping container from aborted consumer
@Service
@PropertySource("classpath:application.yml")
@Slf4j
public class XYZCON {
@Autowired
MediaFactory queueProcessingMediatorFactory;
@RabbitListener(containerFactory="rabbitListenerContainerFactory", queues = "${rabbitmq.xyz.queue.name}")
public void receiveMessage(MessageKeyVo messageId) {
log.info("Received queue message converted from Json: {}", messageId.toString());
RequestVo messageRequest = queueProcessingMediatorFactory.executeMessageFinder(messageId);
}
}
@Configuration
@EnableRabbit
public class RabbitMQConfig implements RabbitListenerConfigurer {
@Value("${rabbitmq.host}")
private String host;
@Value("${rabbitmq.port}")
private Integer port;
@Value("${rabbitmq.username}")
private String username;
@Value("${rabbitmq.password}")
private String password;
@Value("${rabbitmq.concurrent.consumers}")
private Integer concurrentConsumers;
@Value("${rabbitmq.max.concurrent.consumers}")
private Integer maxConcurrentConsumers;
@Value("${rabbitmq.url}")
private String url;
@Value("${rabbitmq.address}")
private String address;
@Value("${rabbitmq.xyz.exchange.name}")
private String xyzExchange;
@Value("${rabbitmq.xyz.queue.name}")
private String xyzQueue;
@Bean
Queue xyzMessageQueue() {
return new Queue(xyzQueue, true, false, false);
}
@Bean
TopicExchange xyzExchange() {
return new TopicExchange(xyzExchange);
}
@Bean
Binding emailMessageQueue(Queue xyzMessageQueue, TopicExchange xyzExchange) {
return BindingBuilder.bind(xyzMessageQueue).to(xyzExchange).with(xyzQueue);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(host);
cachingConnectionFactory.setPort(port);
cachingConnectionFactory.setUsername(username);
cachingConnectionFactory.setPassword(password);
cachingConnectionFactory.setAddresses(address);
cachingConnectionFactory.setUri(url);
cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
cachingConnectionFactory.setRequestedHeartBeat(10);
return cachingConnectionFactory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(concurrentConsumers);
factory.setMaxConcurrentConsumers(maxConcurrentConsumers);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(consumerJackson2MessageConverter());
return factory;
}
@Override
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
java.lang.NoClassDefFoundError:org/springframework/messaging/handler/invocation/MethodArgumentResolutionException
您似乎在类路径中缺少
spring-messaging
(或不兼容的版本,或者 jar 以某种方式损坏)。
显示您的版本/依赖项。