Apache Camel DeadLetterChannel 在写入死信队列时无法捕获 JMS_IBM_MQMD_ApplIdentityData

问题描述 投票:0回答:1

这是一旦发生异常就在 DLQ 上写入错误的代码。

errorHandler(deadLetterChannel("jms:queue:" + this.dlq+"?allowAdditionalHeaders=*")
                .onExceptionOccurred(exchange -> {
                    Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
                    boolean shouldRouteToDLQ = false;
                    int reasonCode = MQException.MQRC_NONE;
                    Throwable cause = exception;
                    while (cause != null) {
                        if (cause instanceof MQException) {
                            MQException mqException = (MQException) cause;
                            if (mqException.reasonCode == MQException.MQRC_Q_FULL ||
                                    mqException.reasonCode == MQException.MQRC_UNKNOWN_OBJECT_NAME || mqException.reasonCode == MQException.MQRC_CONNECTION_BROKEN) {
                                reasonCode = mqException.reasonCode;
                                shouldRouteToDLQ = true;
                                break;
                            }
                        }
                        cause = cause.getCause();
                    }

                    if (shouldRouteToDLQ) {
                        exchange.getIn().setHeader(JmsConstants.JMS_IBM_FORMAT, MQConstants.MQFMT_DEAD_LETTER_HEADER );
                        // Retrieve the current endpoint from headers
                        String currentQueue = exchange.getIn().getHeader("CurrentQueue", String.class);
                        String currentQueueManager = exchange.getIn().getHeader("CurrentQueueManager", String.class);
                        log.info("Current queue {} Current QueueManager {}", currentQueue, currentQueueManager);
                        MQDLH mqdlh = new MQDLH();
                        mqdlh.setCodedCharSetId(1208); // Set to match the MQ Manager's CCSID (UTF-8)
                        mqdlh.setEncoding(MQConstants.MQENC_NATIVE); // Use native encoding

                        log.info("Reason Code" + reasonCode);
                        mqdlh.setReason(MQConstants.MQRC_Q_FULL);
                        mqdlh.setDestQName(currentQueue.replace(".ERROR","")); //For simulation
                        mqdlh.setDestQMgrName(currentQueueManager);
                        mqdlh.setPutApplName(appName);
                        mqdlh.setFormat(MQConstants.MQFMT_STRING);
                        mqdlh.setPutDate(new SimpleDateFormat("yyyyMMdd").format(new Date()));
                        mqdlh.setPutTime(new SimpleDateFormat("HHmmssSS").format(new Date()).substring(0, 8));

                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        DataOutputStream dos = new DataOutputStream(baos);
                        mqdlh.write(dos);
                        byte[] mqdlhBytes = baos.toByteArray();

                        log.info("CCSID: " + mqdlh.getCodedCharSetId());
                        log.info("Encoding: " + mqdlh.getEncoding());

                        log.info("MQ DLH String: " + new String(mqdlhBytes, StandardCharsets.UTF_8));
                        // For debugging: Print the byte array of the MQDLH header
                        log.info("MQDLH Bytes: " + Arrays.toString(mqdlhBytes));

                        // Check the reason code bytes within the header
                        byte[] reasonCodeBytes = ByteBuffer.allocate(4).putInt(MQException.MQRC_Q_FULL).array();
                        log.info("Reason Code Bytes: " + Arrays.toString(reasonCodeBytes));

                        // Verify reason code bytes in MQDLH
                        byte[] reasonCodeBytes2 = Arrays.copyOfRange(mqdlhBytes, 8, 12);
                        log.info("Reason Code Bytes in MQDLH: {}", Arrays.toString(reasonCodeBytes2));

                        byte[] bodyBytes = exchange.getIn().getBody(String.class).getBytes(StandardCharsets.UTF_8);

                        // Ensure the body is correctly converted to bytes
                        String body = exchange.getIn().getBody(String.class);
                        log.info("Original message body: {}", body);
                        byte[] bodyBytes2 = body.getBytes(StandardCharsets.UTF_8);
                        log.info("Body Bytes: " + Arrays.toString(bodyBytes));

                        byte[] combinedMessage = new byte[mqdlhBytes.length + bodyBytes.length];
                        System.arraycopy(mqdlhBytes, 0, combinedMessage, 0, mqdlhBytes.length);
                        System.arraycopy(bodyBytes, 0, combinedMessage, mqdlhBytes.length, bodyBytes.length);
                        log.info("Combined Bytes: " + Arrays.toString(combinedMessage));
                        exchange.getIn().setBody(combinedMessage);

                        log.info("Logging JMS Headers {} ", exchange.getIn().getHeaders().toString());
                        // Set encoding and CCSID in JMS headers if needed
                        exchange.getIn().setHeaders(exchange.getIn().getHeaders());
                        exchange.getIn().setHeader("appIdentity","VOS");
                        exchange.getIn().setHeader("example","value");
                        //exchange.getIn().setHeader("JMS_IBM_MQMD_example","JMS_IBM_MQMD_value");
                        exchange.getIn().setHeader("CurrentQueue",currentQueue.replace(".ERROR",""));
                        exchange.getIn().setHeader(JmsConstants.JMS_IBM_MQMD_APPLIDENTITYDATA, "VOS");
                        exchange.getIn().setHeader(JmsConstants.JMS_IBM_MQMD_PUTAPPLNAME, "VOSAPP");
                        exchange.getIn().setHeader("JMS_IBM_Encoding", MQConstants.MQENC_NATIVE);
                        log.info("Logging JMS Headers after setting encoding {} ", exchange.getIn().getHeaders());
                        exchange.setProperty("FAILURE_ROUTE_TO_DLQ", true);
                        log.info("Headers {} Body {}",exchange.getIn().getHeaders(), new String(exchange.getIn().getBody(byte[].class),StandardCharsets.UTF_8));
                        log.error("MQ Error Occured. Message will be sent to DLQ. Headers {} Message Body {} Error {}",exchange.getIn().getHeaders(), new String(exchange.getIn().getBody(byte[].class),StandardCharsets.UTF_8), dlq, exception);

                    } else {
                        log.error("For Route Id {} Headers {} and Message {} An error occurred: {}", exchange.getFromRouteId(),exchange.getIn().getHeaders(),exchange.getIn().getBody(), exception.getMessage());
                        exchange.setProperty("FAILURE_ROUTE_TO_DLQ", false);
                    }
                })
                .onPrepareFailure(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        log.info("Message before DLQ: " + exchange.getIn().getHeaders());
                    }
                })
                .maximumRedeliveries(3)
                .redeliveryDelay(1000)
                .retryAttemptedLogLevel(LoggingLevel.WARN)
                .onRedelivery(exchange -> {
                    if (Boolean.FALSE.equals(exchange.getProperty("FAILURE_ROUTE_TO_DLQ"))) {
                        // If not routing to DLQ, handle the redelivery differently, e.g., log and stop redelivery
                        log.error("Redelivery stopped due to non-MQ specific error");
                        throw new Exception("Redelivery stopped due to non-MQ specific error. Message: "+exchange.getIn().getBody());
                    }
                })
                .logHandled(true)
        );

这是我的日志输出,从中我可以看到 JMS_IBM_MQMD_ApplIdentityData 已设置。

Message before DLQ: {appIdentity=HENRY, CamelMessageTimestamp=1723465932100, CurrentQueue=EXAMPLE.ISB.HENRY.ISO, CurrentQueueManager=EXAMPLE.DEV.QMGR, example=value, JMS_IBM_Character_Set=ISO-8859-1, JMS_IBM_Encoding=273, JMS_IBM_Format=MQDEAD  , JMS_IBM_MQMD_ApplIdentityData=HENRY, JMS_IBM_MQMD_PutApplName=HENRYAPP, JMS_IBM_MsgType=8, JMS_IBM_PutApplType=6, JMS_IBM_PutDate=20240812, JMS_IBM_PutTime=12321210, JMSCorrelationID=null, JMSCorrelationIDAsBytes=null, JMSDeliveryMode=1, JMSDestination=null, JMSExpiration=0, JMSMessageID=ID:414d51204750502e4445562e514d47526ecab96608879521, JMSPriority=0, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1723465932100, JMSType=null, JMSXAppID=swx                         , JMSXDeliveryCount=1, JMSXGroupID=null, JMSXUserID=EXAMPLE         }

所有其他标头都已正确填充,但当我使用 IBM MQ Explorer 查看消息时,JMS_IBM_MQMD_ApplIdentityData 未填充。

enter image description here

java apache-camel ibm-mq spring-camel dlq
1个回答
0
投票

MQMD结构中有8个字段比较特殊,需要授权:

起源上下文:

  • PutApplType
  • 放置应用程序名称
  • 放置日期
  • 放置时间
  • 应用原始数据

身份背景:

  • 用户标识符
  • 记账代币
  • 应用身份数据

在使用任何这些 MQMD 字段之前,您需要告诉 MQ 您的代码将设置/更新这些字段,然后 MQ 将检查您是否有权执行您想要执行的操作。

您可以在此处的 IBM MQ 文档中阅读相关内容:https://www.ibm.com/docs/en/ibm-mq/latest?topic=application-jms-message-object-properties

© www.soinside.com 2019 - 2024. All rights reserved.