我有一个简单的 Rabbit 监听器,它总是抛出
AmqpRejectAndDontRequeueException
(测试监听器是否可以处理多个无效消息)。
配置为:
@RabbitListener(
id = TestConfig.LISTENER_ID,
queues = TestConfig.DATA_QUEUE,
containerFactory = TestConfig.LISTENER_FACTORY
)
public void consume(String data, Message message) {
if (true) {
throw new AmqpRejectAndDontRequeueException("don't requeue");
}
}
static final String LISTENER_ID = "listenerId";
static final String DATA_QUEUE = "data.queue";
static final String LISTENER_FACTORY = "listenerFactory";
private final AmqpAdmin amqpAdmin;
private final ConnectionFactory connectionFactory;
TestConfig(AmqpAdmin amqpAdmin, ConnectionFactory connectionFactory) {
this.amqpAdmin = amqpAdmin;
this.connectionFactory = connectionFactory;
}
@Bean
RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public Queue consumedTripQueue() {
Queue queue = new Queue(DATA_QUEUE);
queue.setAdminsThatShouldDeclare(amqpAdmin);
return queue;
}
@Bean(name = LISTENER_FACTORY)
public SimpleRabbitListenerContainerFactory listenerFactory(RabbitTransactionManager rabbitTransactionManager) {
StatefulRetryOperationsInterceptor backOffRetryInterceptor = statefulRetryOperationsInterceptor();
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(1);
factory.setAutoStartup(true);
factory.setTransactionManager(rabbitTransactionManager);
factory.setAdviceChain(backOffRetryInterceptor);
return factory;
}
private StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor() {
RejectAndDontRequeueRecoverer messageRecoverer = new RejectAndDontRequeueRecoverer(); // to be changed
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryContextCache(new MapRetryContextCache(3));
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(1));
return RetryInterceptorBuilder.stateful()
.retryOperations(retryTemplate)
.recoverer(messageRecoverer)
.build();
}
当
AmqpRejectAndDontRequeueException
被扔进监听器时,MapRetryContextCache
正在填充,但没有被清除。应用程序最终抛出 RetryCacheCapacityExceededException
。
我已经设法使其工作,从侦听器抛出自定义异常并在自定义消息恢复器中处理它。但在这种情况下,根据重试策略,消息仍会重新排队一次。 我做错了什么? 我不应该将
AmqpRejectAndDontRequeueException
与有状态拦截器一起使用吗?
如何使用有状态拦截器拒绝消息并且不重新排队消息?
Spring Retry 对 Spring AMQP 异常的语义一无所知。
您应该配置拦截器,使
AmqpRejectAndDontRequeueException
成为不可重试的异常,以避免此问题。