在Azure服务总线中,如何使用JMS将消息发送到带有“DeadLetterReason”的死信队列?

问题描述 投票:0回答:1

我正在使用 JMS 侦听器,使用 Java 语言和 Spring 框架围绕 Azure 服务总线构建服务。 我可以找到一种方法将消息直接发送到 DeadLetterQueue,而无需等待重试。如果您知道该错误是致命的并且重试没有意义,那么这很有用。 这是我直接发送到 DLQ 的代码:

@JmsListener(
        destination = "topic-name",
        containerFactory = "topicJmsListenerContainerFactory",
        subscription = "subscription-name"
)
public void receive(JmsMessage message, Session session) throws JMSException, JsonProcessingException, IllegalAccessException, NoSuchFieldException {
    log.info("Message received from {} : {}", subscriptionName, message.getJMSMessageID());
    try {
        StatusDto status = service.process(parseMessageToDto(message), message.getJMSCorrelationID());
        log.info("Message: {} sent with success. Status: {}", message.getJMSCorrelationID(), mapper.writeValueAsString(status));
    } catch (FatalException | JsonProcessingException | ConstraintViolationException exception) {
        serviceBusHelper.moveToDeadLetterQueue(message, session, exception);
    }
}

这是我的 serviceBusHelper,它将消息直接发送到 DLQ。

private final int deadLetterQueue = ProviderConstants.ACK_TYPE.REJECTED.ordinal();

public void moveToDeadLetterQueue(JmsMessage message, Session session, Exception reason)
        throws NoSuchFieldException, IllegalAccessException, JMSException, JsonProcessingException {
    message.setReadOnlyProperties(false);
    String deadLetterReason = reason.getClass().getSimpleName();
    message.setStringProperty("DeadLetterReason", deadLetterReason);
    message.setStringProperty("DeadLetterErrorDescription", reason.getMessage());
    message.setAcknowledgeCallback(buildDlqAcknowledgeCallback(session));
    message.acknowledge();
    log.info("Moved message: {}, to dead letter queue due to: {}", message.getJMSCorrelationID(), deadLetterReason);
}

JmsAcknowledgeCallback buildDlqAcknowledgeCallback(Session session) throws NoSuchFieldException, IllegalAccessException {
    JmsAcknowledgeCallback callback = new JmsAcknowledgeCallback(getInnerSessionFromAzureServiceBus(session));
    callback.setAckType(deadLetterQueue);
    return callback;
}

JmsSession getInnerSessionFromAzureServiceBus(Session session) throws NoSuchFieldException, IllegalAccessException {
    log.debug("Fetch inner session from session with class: {}", session.getClass().getSimpleName());
    Session serviceBusJmsSession = (Session) unProxy(session); // Session is usually a proxy.
    Field field = serviceBusJmsSession.getClass().getDeclaredField("innerSession");
    field.setAccessible(true);
    return (JmsSession) field.get(serviceBusJmsSession);
}

Object unProxy(Object proxy) throws NoSuchFieldException, IllegalAccessException {
    if (!Proxy.isProxyClass(proxy.getClass())) {
        return proxy;
    } else {
        InvocationHandler handler = Proxy.getInvocationHandler(proxy);
        Field targetField = handler.getClass().getDeclaredField("target");
        targetField.setAccessible(true);
        return unProxy(targetField.get(handler));
    }
}

我必须使用反射才能获取 JmsSession 并使用直接向 DLQ 发送消息的 ACK_TYPE.REJECTED 创建一个 recognizeCallback。

最后这是我对 topicJmsListenerContainerFactory 的配置:

spring.jms.servicebus.namespace=namespace
spring.jms.servicebus.pricing-tier=standard
spring.jms.servicebus.passwordless-enabled=true
spring.jms.servicebus.enabled=true
spring.jms.cache.producers=false
spring.jms.template.session.transacted=false
spring.jms.template.session.acknowledge-mode=client
spring.jms.listener.session.transacted=false
spring.jms.listener.session.acknowledge-mode=client
spring.jms.servicebus.listener.subscription-durable=true
spring.jms.listener.receive-timeout=60000

acknowledge-mode=client 允许我确认我的消息并使用将消息发送到 DLQ 的回调。然而,这不允许我使用似乎能够更新和添加消息属性的事务处理会话。

总结一下: 通过此配置和代码,我可以直接向 DLQ 发送消息,但不会添加 DeadLetterReason。您知道或知道如何添加此字符串属性吗?

非常感谢。

azure jms azureservicebus spring-jms
1个回答
0
投票
我尝试使用 Spring Boot 应用程序使用 JMS 将消息移动到

Dead Letter Queue

 并成功接收到它。

    以下代码连接到 Azure 服务总线主题以接收消息,将消息移动到死信队列以显示失败,并使用
  • PEEK_LOCK
     接收模式来锁定和处理消息。

ServiceBusReceiver.java:

import com.azure.messaging.servicebus.ServiceBusClientBuilder; import com.azure.messaging.servicebus.ServiceBusReceiverClient; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; @Service public class ServiceBusReceiver { @Value("${azure.servicebus.connection-string}") private String connectionString; @Value("${azure.servicebus.topic-name}") private String topicName; @Value("${azure.servicebus.subscription-name}") private String subscriptionName; private final JmsTemplate jmsTemplate; public ServiceBusReceiver(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } @JmsListener(destination = "${azure.servicebus.topic-name}", containerFactory = "jmsListenerContainerFactory") public void receiveMessages() { try (ServiceBusReceiverClient receiverClient = new ServiceBusClientBuilder() .connectionString(connectionString) .receiver() .topicName(topicName) .subscriptionName(subscriptionName) .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .buildClient()) { receiverClient.receiveMessages(1).forEach(message -> { try { System.out.println("Simulating failure for message: " + message.getMessageId()); receiverClient.deadLetter(message); System.out.println("Message " + message.getMessageId() + " moved to Dead Letter Queue."); } catch (Exception e) { System.out.println("Failed to process message " + message.getMessageId() + ": " + e.getMessage()); } }); } } }

ServiceBusReceiverController.java:

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class ServiceBusReceiverController { @Autowired private ServiceBusReceiver serviceBusReceiver; @GetMapping("/receive-messages") public String receiveMessages() { serviceBusReceiver.receiveMessages(); return "Message moved to Dead Letter queue."; } }

pom.xml:

<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> <version>2.5.4</version> </dependency> <dependency> <groupId>com.azure</groupId> <artifactId>azure-messaging-servicebus</artifactId> <version>7.12.0</version> </dependency> <dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>com.azure.spring</groupId> <artifactId>azure-spring-boot-starter-servicebus-jms</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>

应用程序属性:

azure.servicebus.connection-string=<TopicConneString> azure.servicebus.topic-name=<topicName> azure.servicebus.subscription-name=<subscriptionName>
我运行了上面的 Spring Boot 应用程序并向 Azure 服务总线主题发送了一条消息,如下所示。

enter image description here

我在浏览器中发送了下面的请求,如图所示。

enter image description here

然后,我在Azure Service Bus Topic的

Dead Letter Queue

中收到了消息,如下图。

enter image description here

Spring Boot 输出:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.