我有一个接收消息的 IbmMqEndpoint。我想要的是,如果在消息处理过程中发生错误,消息将保留在原始队列中。以下实现可以完美运行并处理无错误的消息。
@Bean(name = "ibmMqListenerContainerFactory")
public DefaultJmsListenerContainerFactory ibmMqListenerContainerFactory(
@Qualifier("ibmMqConnectionFactory") ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionTransacted(true);
factory.setBackOff(new FixedBackOff(5000L, 3));
factory.setErrorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.warn("spring jms custom error handling example");
logger.error(t.getCause().getMessage());
throw new MessageSendException("Error", new Exception("Test"));
}
});
return factory;
}
以下注册MessageListener:
private SimpleJmsListenerEndpoint getJmsListenerEndpoint(String endpoint) {
SimpleJmsListenerEndpoint jmsListenerEndpoint = new SimpleJmsListenerEndpoint();
jmsListenerEndpoint.setId(endpoint);
jmsListenerEndpoint.setDestination(endpoint);
jmsListenerEndpoint.setMessageListener(m -> {
try {
byte[] body = m.getBody(byte[].class);
MyMessage myMessage = new MyMessage(body);
messageHandler.accept(myMessage);
} catch MessageSendException | JMSException e) {
throw new MessageSendException(e);
}
});
return jmsListenerEndpoint;
}
但是,在设置 messageListener 并发送错误消息后,会抛出 MessageSentException。底层的 DefaultMessageListenerContainer 实现通过在方法中调用 backOff.start() 来重置 BackOff 变量 currentAttempts
private void waitBeforeRecoveryAttempt() {
BackOffExecution execution = DefaultMessageListenerContainer.this.backOff.start();
DefaultMessageListenerContainer.this.applyBackOffTime(execution);
}
引起我注意的是,调用了DefaultMessageListenerContainer中的handleListenerSetupFailure方法。但这不是处理程序设置失败,不是吗?我认为这是消息处理失败,因为设置是在应用程序启动时进行的。
可能我在消息重写到原始队列方面走错了路,但它看起来是 setSessionTransacted 的一个好方法。这里有 Spring JMS 专家可以向我解释如何使用 spring-boot 框架实现三个退避尝试吗?一般来说,我不明白为什么 DefaultMessageListenerContainer 有一个 backOff 选项,它总是默认为无限循环......
只要正确实现了消息的回退计数,并且为您正在侦听的队列定义了回退阈值和回退队列,那么底层 IBM JMS 代码就应该将消息移动到回退队列,当回退时计数达到阈值。
如果没有发生这种情况,请检查回退计数是否正在增加。如果该值正在增加,则检查是否设置了中断队列和阈值。您可以使用此repo
中的以下示例代码将这些记录到您的代码中import com.ibm.mq.*;
import com.ibm.mq.constants.CMQC;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class CmdRunner115 implements CommandLineRunner{
protected final Log logger = LogFactory.getLog(getClass());
@Value("${app.l115.queue.name1:DEV.QUEUE.1}")
private String queueName = "";
private final MQQueueManager mqQueueManager;
CmdRunner115(MQQueueManager mqQueueManager) {
this.mqQueueManager = mqQueueManager;
}
@Override
public void run(String ... args) throws Exception{
logger.info("Determining Backout threshold");
try {
int[] selectors = {
CMQC.MQIA_BACKOUT_THRESHOLD,
CMQC.MQCA_BACKOUT_REQ_Q_NAME };
int[] intAttrs = new int[1];
byte[] charAttrs = new byte[CMQC.MQ_Q_NAME_LENGTH];
int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF | CMQC.MQOO_INQUIRE | CMQC.MQOO_SAVE_ALL_CONTEXT;
MQQueue myQueue = mqQueueManager.accessQueue(queueName, openOptions, null, null, null);
logger.info("Queue Obtained");
MQManagedObject moMyQueue = (MQManagedObject) myQueue;
moMyQueue.inquire(selectors, intAttrs, charAttrs);
int boThresh = intAttrs[0];
String backoutQname = new String(charAttrs);
logger.info("Backout Threshold: " + boThresh);
logger.info("Backout Queue: " + backoutQname);
} catch (MQException e) {
logger.warn("MQException Error obtaining backout threshold");
logger.warn(e.getMessage());
}
}
}
如果您有递增的回退计数、回退阈值和回退队列,并且消息未移至回退队列,请检查是否存在指示回退队列访问失败的错误。