BackOff 和事务性不适用于消息处理,仅适用于侦听器设置

问题描述 投票:0回答:1

我有一个接收消息的 IbmMqEndpoint。我想要的是,如果在消息处理过程中发生错误,消息将保留在原始队列中。以下实现可以完美运行并处理无错误的消息。

   @Bean(name = "ibmMqListenerContainerFactory")
   public DefaultJmsListenerContainerFactory ibmMqListenerContainerFactory(
                                             @Qualifier("ibmMqConnectionFactory") ConnectionFactory connectionFactory) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setSessionTransacted(true);
    factory.setBackOff(new FixedBackOff(5000L, 3));
    factory.setErrorHandler(new ErrorHandler() {
        @Override
        public void handleError(Throwable t) {
            logger.warn("spring jms custom error handling example");
            logger.error(t.getCause().getMessage());
            throw new MessageSendException("Error", new Exception("Test"));
        }
    });
    return factory;
}

以下注册MessageListener:

private SimpleJmsListenerEndpoint getJmsListenerEndpoint(String endpoint) {
    SimpleJmsListenerEndpoint jmsListenerEndpoint = new SimpleJmsListenerEndpoint();
    jmsListenerEndpoint.setId(endpoint);
    jmsListenerEndpoint.setDestination(endpoint);

    jmsListenerEndpoint.setMessageListener(m -> {
        try {

            byte[] body = m.getBody(byte[].class);
            MyMessage myMessage = new MyMessage(body);
            messageHandler.accept(myMessage);
  
        } catch MessageSendException | JMSException e) {
            throw new MessageSendException(e);
        }
    });
    return jmsListenerEndpoint;
}

但是,在设置 messageListener 并发送错误消息后,会抛出 MessageSentException。底层的 DefaultMessageListenerContainer 实现通过在方法中调用 backOff.start() 来重置 BackOff 变量 currentAttempts

 private void waitBeforeRecoveryAttempt() {
        BackOffExecution execution = DefaultMessageListenerContainer.this.backOff.start();
        DefaultMessageListenerContainer.this.applyBackOffTime(execution);
    }

引起我注意的是,调用了DefaultMessageListenerContainer中的handleListenerSetupFailure方法。但这不是处理程序设置失败,不是吗?我认为这是消息处理失败,因为设置是在应用程序启动时进行的。

可能我在消息重写到原始队列方面走错了路,但它看起来是 setSessionTransacted 的一个好方法。这里有 Spring JMS 专家可以向我解释如何使用 spring-boot 框架实现三个退避尝试吗?一般来说,我不明白为什么 DefaultMessageListenerContainer 有一个 backOff 选项,它总是默认为无限循环......

spring-boot jms ibm-mq spring-jms exponential-backoff
1个回答
0
投票

只要正确实现了消息的回退计数,并且为您正在侦听的队列定义了回退阈值和回退队列,那么底层 IBM JMS 代码就应该将消息移动到回退队列,当回退时计数达到阈值。

如果没有发生这种情况,请检查回退计数是否正在增加。如果该值正在增加,则检查是否设置了中断队列和阈值。您可以使用此repo

中的以下示例代码将这些记录到您的代码中
import com.ibm.mq.*;
import com.ibm.mq.constants.CMQC;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class CmdRunner115 implements CommandLineRunner{
    protected final Log logger = LogFactory.getLog(getClass());

    @Value("${app.l115.queue.name1:DEV.QUEUE.1}")
    private String queueName = "";

    private final MQQueueManager mqQueueManager;

    CmdRunner115(MQQueueManager mqQueueManager) {
        this.mqQueueManager = mqQueueManager;
    }

    @Override
    public void run(String ... args) throws Exception{
        logger.info("Determining Backout threshold");
            try {
                int[] selectors = {
                        CMQC.MQIA_BACKOUT_THRESHOLD,
                        CMQC.MQCA_BACKOUT_REQ_Q_NAME };
                int[] intAttrs = new int[1];
                byte[] charAttrs = new byte[CMQC.MQ_Q_NAME_LENGTH];

                int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF | CMQC.MQOO_INQUIRE | CMQC.MQOO_SAVE_ALL_CONTEXT;
                MQQueue myQueue = mqQueueManager.accessQueue(queueName, openOptions, null, null, null);
                logger.info("Queue Obtained");

                MQManagedObject moMyQueue = (MQManagedObject) myQueue;
                moMyQueue.inquire(selectors, intAttrs, charAttrs);

                int boThresh = intAttrs[0];
                String backoutQname = new String(charAttrs);

                logger.info("Backout Threshold: " + boThresh);
                logger.info("Backout Queue: " + backoutQname);

            } catch (MQException e) {
                logger.warn("MQException Error obtaining backout threshold");
                logger.warn(e.getMessage());
        }
    }
}

如果您有递增的回退计数、回退阈值和回退队列,并且消息未移至回退队列,请检查是否存在指示回退队列访问失败的错误。

© www.soinside.com 2019 - 2024. All rights reserved.