Spring amqp RetryCache 抛出 AmqpRejectAndDontRequeueException 时未清除

问题描述 投票:0回答:1

我有一个简单的 Rabbit 监听器,它总是抛出

AmqpRejectAndDontRequeueException
(测试监听器是否可以处理多个无效消息)。

配置为:

@RabbitListener(
    id = TestConfig.LISTENER_ID,
    queues = TestConfig.DATA_QUEUE,
    containerFactory = TestConfig.LISTENER_FACTORY
)
public void consume(String data, Message message) {
    if (true) {
        throw new AmqpRejectAndDontRequeueException("don't requeue");
    }
}
static final String LISTENER_ID = "listenerId";

static final String DATA_QUEUE = "data.queue";

static final String LISTENER_FACTORY = "listenerFactory";

private final AmqpAdmin amqpAdmin;

private final ConnectionFactory connectionFactory;

TestConfig(AmqpAdmin amqpAdmin, ConnectionFactory connectionFactory) {
    this.amqpAdmin = amqpAdmin;
    this.connectionFactory = connectionFactory;
}

@Bean
RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}

@Bean
public Queue consumedTripQueue() {
    Queue queue = new Queue(DATA_QUEUE);
    queue.setAdminsThatShouldDeclare(amqpAdmin);
    return queue;
}

@Bean(name = LISTENER_FACTORY)
public SimpleRabbitListenerContainerFactory listenerFactory(RabbitTransactionManager rabbitTransactionManager) {

    StatefulRetryOperationsInterceptor backOffRetryInterceptor = statefulRetryOperationsInterceptor();
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(1);
    factory.setAutoStartup(true);
    factory.setTransactionManager(rabbitTransactionManager);
    factory.setAdviceChain(backOffRetryInterceptor);

    return factory;
}

private StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor() {

    RejectAndDontRequeueRecoverer messageRecoverer = new RejectAndDontRequeueRecoverer(); // to be changed

    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setRetryContextCache(new MapRetryContextCache(3));
    retryTemplate.setRetryPolicy(new SimpleRetryPolicy(1));

    return RetryInterceptorBuilder.stateful()
      .retryOperations(retryTemplate)
      .recoverer(messageRecoverer)
      .build();
}

AmqpRejectAndDontRequeueException
被扔进监听器时,
MapRetryContextCache
正在填充,但没有被清除。应用程序最终抛出
RetryCacheCapacityExceededException

我已经设法使其工作,从侦听器抛出自定义异常并在自定义消息恢复器中处理它。但在这种情况下,根据重试策略,消息仍会重新排队一次。 我做错了什么? 我不应该将

AmqpRejectAndDontRequeueException
与有状态拦截器一起使用吗? 如何使用有状态拦截器拒绝消息并且不重新排队消息?

java rabbitmq spring-amqp spring-rabbit spring-retry
1个回答
0
投票

Spring Retry 对 Spring AMQP 异常的语义一无所知。

您应该配置拦截器,使

AmqpRejectAndDontRequeueException
成为不可重试的异常,以避免此问题。

© www.soinside.com 2019 - 2024. All rights reserved.