这是一旦发生异常就在 DLQ 上写入错误的代码。
errorHandler(deadLetterChannel("jms:queue:" + this.dlq+"?allowAdditionalHeaders=*")
.onExceptionOccurred(exchange -> {
Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
boolean shouldRouteToDLQ = false;
int reasonCode = MQException.MQRC_NONE;
Throwable cause = exception;
while (cause != null) {
if (cause instanceof MQException) {
MQException mqException = (MQException) cause;
if (mqException.reasonCode == MQException.MQRC_Q_FULL ||
mqException.reasonCode == MQException.MQRC_UNKNOWN_OBJECT_NAME || mqException.reasonCode == MQException.MQRC_CONNECTION_BROKEN) {
reasonCode = mqException.reasonCode;
shouldRouteToDLQ = true;
break;
}
}
cause = cause.getCause();
}
if (shouldRouteToDLQ) {
exchange.getIn().setHeader(JmsConstants.JMS_IBM_FORMAT, MQConstants.MQFMT_DEAD_LETTER_HEADER );
// Retrieve the current endpoint from headers
String currentQueue = exchange.getIn().getHeader("CurrentQueue", String.class);
String currentQueueManager = exchange.getIn().getHeader("CurrentQueueManager", String.class);
log.info("Current queue {} Current QueueManager {}", currentQueue, currentQueueManager);
MQDLH mqdlh = new MQDLH();
mqdlh.setCodedCharSetId(1208); // Set to match the MQ Manager's CCSID (UTF-8)
mqdlh.setEncoding(MQConstants.MQENC_NATIVE); // Use native encoding
log.info("Reason Code" + reasonCode);
mqdlh.setReason(MQConstants.MQRC_Q_FULL);
mqdlh.setDestQName(currentQueue.replace(".ERROR","")); //For simulation
mqdlh.setDestQMgrName(currentQueueManager);
mqdlh.setPutApplName(appName);
mqdlh.setFormat(MQConstants.MQFMT_STRING);
mqdlh.setPutDate(new SimpleDateFormat("yyyyMMdd").format(new Date()));
mqdlh.setPutTime(new SimpleDateFormat("HHmmssSS").format(new Date()).substring(0, 8));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
mqdlh.write(dos);
byte[] mqdlhBytes = baos.toByteArray();
log.info("CCSID: " + mqdlh.getCodedCharSetId());
log.info("Encoding: " + mqdlh.getEncoding());
log.info("MQ DLH String: " + new String(mqdlhBytes, StandardCharsets.UTF_8));
// For debugging: Print the byte array of the MQDLH header
log.info("MQDLH Bytes: " + Arrays.toString(mqdlhBytes));
// Check the reason code bytes within the header
byte[] reasonCodeBytes = ByteBuffer.allocate(4).putInt(MQException.MQRC_Q_FULL).array();
log.info("Reason Code Bytes: " + Arrays.toString(reasonCodeBytes));
// Verify reason code bytes in MQDLH
byte[] reasonCodeBytes2 = Arrays.copyOfRange(mqdlhBytes, 8, 12);
log.info("Reason Code Bytes in MQDLH: {}", Arrays.toString(reasonCodeBytes2));
byte[] bodyBytes = exchange.getIn().getBody(String.class).getBytes(StandardCharsets.UTF_8);
// Ensure the body is correctly converted to bytes
String body = exchange.getIn().getBody(String.class);
log.info("Original message body: {}", body);
byte[] bodyBytes2 = body.getBytes(StandardCharsets.UTF_8);
log.info("Body Bytes: " + Arrays.toString(bodyBytes));
byte[] combinedMessage = new byte[mqdlhBytes.length + bodyBytes.length];
System.arraycopy(mqdlhBytes, 0, combinedMessage, 0, mqdlhBytes.length);
System.arraycopy(bodyBytes, 0, combinedMessage, mqdlhBytes.length, bodyBytes.length);
log.info("Combined Bytes: " + Arrays.toString(combinedMessage));
exchange.getIn().setBody(combinedMessage);
log.info("Logging JMS Headers {} ", exchange.getIn().getHeaders().toString());
// Set encoding and CCSID in JMS headers if needed
exchange.getIn().setHeaders(exchange.getIn().getHeaders());
exchange.getIn().setHeader("appIdentity","VOS");
exchange.getIn().setHeader("example","value");
//exchange.getIn().setHeader("JMS_IBM_MQMD_example","JMS_IBM_MQMD_value");
exchange.getIn().setHeader("CurrentQueue",currentQueue.replace(".ERROR",""));
exchange.getIn().setHeader(JmsConstants.JMS_IBM_MQMD_APPLIDENTITYDATA, "VOS");
exchange.getIn().setHeader(JmsConstants.JMS_IBM_MQMD_PUTAPPLNAME, "VOSAPP");
exchange.getIn().setHeader("JMS_IBM_Encoding", MQConstants.MQENC_NATIVE);
log.info("Logging JMS Headers after setting encoding {} ", exchange.getIn().getHeaders());
exchange.setProperty("FAILURE_ROUTE_TO_DLQ", true);
log.info("Headers {} Body {}",exchange.getIn().getHeaders(), new String(exchange.getIn().getBody(byte[].class),StandardCharsets.UTF_8));
log.error("MQ Error Occured. Message will be sent to DLQ. Headers {} Message Body {} Error {}",exchange.getIn().getHeaders(), new String(exchange.getIn().getBody(byte[].class),StandardCharsets.UTF_8), dlq, exception);
} else {
log.error("For Route Id {} Headers {} and Message {} An error occurred: {}", exchange.getFromRouteId(),exchange.getIn().getHeaders(),exchange.getIn().getBody(), exception.getMessage());
exchange.setProperty("FAILURE_ROUTE_TO_DLQ", false);
}
})
.onPrepareFailure(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
log.info("Message before DLQ: " + exchange.getIn().getHeaders());
}
})
.maximumRedeliveries(3)
.redeliveryDelay(1000)
.retryAttemptedLogLevel(LoggingLevel.WARN)
.onRedelivery(exchange -> {
if (Boolean.FALSE.equals(exchange.getProperty("FAILURE_ROUTE_TO_DLQ"))) {
// If not routing to DLQ, handle the redelivery differently, e.g., log and stop redelivery
log.error("Redelivery stopped due to non-MQ specific error");
throw new Exception("Redelivery stopped due to non-MQ specific error. Message: "+exchange.getIn().getBody());
}
})
.logHandled(true)
);
这是我的日志输出,从中我可以看到 JMS_IBM_MQMD_ApplIdentityData 已设置。
Message before DLQ: {appIdentity=HENRY, CamelMessageTimestamp=1723465932100, CurrentQueue=EXAMPLE.ISB.HENRY.ISO, CurrentQueueManager=EXAMPLE.DEV.QMGR, example=value, JMS_IBM_Character_Set=ISO-8859-1, JMS_IBM_Encoding=273, JMS_IBM_Format=MQDEAD , JMS_IBM_MQMD_ApplIdentityData=HENRY, JMS_IBM_MQMD_PutApplName=HENRYAPP, JMS_IBM_MsgType=8, JMS_IBM_PutApplType=6, JMS_IBM_PutDate=20240812, JMS_IBM_PutTime=12321210, JMSCorrelationID=null, JMSCorrelationIDAsBytes=null, JMSDeliveryMode=1, JMSDestination=null, JMSExpiration=0, JMSMessageID=ID:414d51204750502e4445562e514d47526ecab96608879521, JMSPriority=0, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1723465932100, JMSType=null, JMSXAppID=swx , JMSXDeliveryCount=1, JMSXGroupID=null, JMSXUserID=EXAMPLE }
所有其他标头都已正确填充,但当我使用 IBM MQ Explorer 查看消息时,JMS_IBM_MQMD_ApplIdentityData 未填充。
MQMD结构中有8个字段比较特殊,需要授权:
起源上下文:
身份背景:
在使用任何这些 MQMD 字段之前,您需要告诉 MQ 您的代码将设置/更新这些字段,然后 MQ 将检查您是否有权执行您想要执行的操作。
您可以在此处的 IBM MQ 文档中阅读相关内容:https://www.ibm.com/docs/en/ibm-mq/latest?topic=application-jms-message-object-properties