我正在使用 JMS 侦听器,使用 Java 语言和 Spring 框架围绕 Azure 服务总线构建服务。 我可以找到一种方法将消息直接发送到 DeadLetterQueue,而无需等待重试。如果您知道该错误是致命的并且重试没有意义,那么这很有用。 这是我直接发送到 DLQ 的代码:
@JmsListener(
destination = "topic-name",
containerFactory = "topicJmsListenerContainerFactory",
subscription = "subscription-name"
)
public void receive(JmsMessage message, Session session) throws JMSException, JsonProcessingException, IllegalAccessException, NoSuchFieldException {
log.info("Message received from {} : {}", subscriptionName, message.getJMSMessageID());
try {
StatusDto status = service.process(parseMessageToDto(message), message.getJMSCorrelationID());
log.info("Message: {} sent with success. Status: {}", message.getJMSCorrelationID(), mapper.writeValueAsString(status));
} catch (FatalException | JsonProcessingException | ConstraintViolationException exception) {
serviceBusHelper.moveToDeadLetterQueue(message, session, exception);
}
}
这是我的 serviceBusHelper,它将消息直接发送到 DLQ。
private final int deadLetterQueue = ProviderConstants.ACK_TYPE.REJECTED.ordinal();
public void moveToDeadLetterQueue(JmsMessage message, Session session, Exception reason)
throws NoSuchFieldException, IllegalAccessException, JMSException, JsonProcessingException {
message.setReadOnlyProperties(false);
String deadLetterReason = reason.getClass().getSimpleName();
message.setStringProperty("DeadLetterReason", deadLetterReason);
message.setStringProperty("DeadLetterErrorDescription", reason.getMessage());
message.setAcknowledgeCallback(buildDlqAcknowledgeCallback(session));
message.acknowledge();
log.info("Moved message: {}, to dead letter queue due to: {}", message.getJMSCorrelationID(), deadLetterReason);
}
JmsAcknowledgeCallback buildDlqAcknowledgeCallback(Session session) throws NoSuchFieldException, IllegalAccessException {
JmsAcknowledgeCallback callback = new JmsAcknowledgeCallback(getInnerSessionFromAzureServiceBus(session));
callback.setAckType(deadLetterQueue);
return callback;
}
JmsSession getInnerSessionFromAzureServiceBus(Session session) throws NoSuchFieldException, IllegalAccessException {
log.debug("Fetch inner session from session with class: {}", session.getClass().getSimpleName());
Session serviceBusJmsSession = (Session) unProxy(session); // Session is usually a proxy.
Field field = serviceBusJmsSession.getClass().getDeclaredField("innerSession");
field.setAccessible(true);
return (JmsSession) field.get(serviceBusJmsSession);
}
Object unProxy(Object proxy) throws NoSuchFieldException, IllegalAccessException {
if (!Proxy.isProxyClass(proxy.getClass())) {
return proxy;
} else {
InvocationHandler handler = Proxy.getInvocationHandler(proxy);
Field targetField = handler.getClass().getDeclaredField("target");
targetField.setAccessible(true);
return unProxy(targetField.get(handler));
}
}
我必须使用反射才能获取 JmsSession 并使用直接向 DLQ 发送消息的 ACK_TYPE.REJECTED 创建一个 recognizeCallback。
最后这是我对 topicJmsListenerContainerFactory 的配置:
spring.jms.servicebus.namespace=namespace
spring.jms.servicebus.pricing-tier=standard
spring.jms.servicebus.passwordless-enabled=true
spring.jms.servicebus.enabled=true
spring.jms.cache.producers=false
spring.jms.template.session.transacted=false
spring.jms.template.session.acknowledge-mode=client
spring.jms.listener.session.transacted=false
spring.jms.listener.session.acknowledge-mode=client
spring.jms.servicebus.listener.subscription-durable=true
spring.jms.listener.receive-timeout=60000
acknowledge-mode=client 允许我确认我的消息并使用将消息发送到 DLQ 的回调。然而,这不允许我使用似乎能够更新和添加消息属性的事务处理会话。
总结一下: 通过此配置和代码,我可以直接向 DLQ 发送消息,但不会添加 DeadLetterReason。您知道或知道如何添加此字符串属性吗?
非常感谢。
Dead Letter Queue
并成功接收到它。
PEEK_LOCK
接收模式来锁定和处理消息。
ServiceBusReceiver.java:
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusReceiverClient;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
@Service
public class ServiceBusReceiver {
@Value("${azure.servicebus.connection-string}")
private String connectionString;
@Value("${azure.servicebus.topic-name}")
private String topicName;
@Value("${azure.servicebus.subscription-name}")
private String subscriptionName;
private final JmsTemplate jmsTemplate;
public ServiceBusReceiver(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
@JmsListener(destination = "${azure.servicebus.topic-name}", containerFactory = "jmsListenerContainerFactory")
public void receiveMessages() {
try (ServiceBusReceiverClient receiverClient = new ServiceBusClientBuilder()
.connectionString(connectionString)
.receiver()
.topicName(topicName)
.subscriptionName(subscriptionName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.buildClient()) {
receiverClient.receiveMessages(1).forEach(message -> {
try {
System.out.println("Simulating failure for message: " + message.getMessageId());
receiverClient.deadLetter(message);
System.out.println("Message " + message.getMessageId() + " moved to Dead Letter Queue.");
} catch (Exception e) {
System.out.println("Failed to process message " + message.getMessageId() + ": " + e.getMessage());
}
});
}
}
}
ServiceBusReceiverController.java:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ServiceBusReceiverController {
@Autowired
private ServiceBusReceiver serviceBusReceiver;
@GetMapping("/receive-messages")
public String receiveMessages() {
serviceBusReceiver.receiveMessages();
return "Message moved to Dead Letter queue.";
}
}
pom.xml:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.5.4</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.12.0</version>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>azure-spring-boot-starter-servicebus-jms</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
应用程序属性:
azure.servicebus.connection-string=<TopicConneString>
azure.servicebus.topic-name=<topicName>
azure.servicebus.subscription-name=<subscriptionName>
我运行了上面的 Spring Boot 应用程序并向 Azure 服务总线主题发送了一条消息,如下所示。
我在浏览器中发送了下面的请求,如图所示。
然后,我在Azure Service Bus Topic的
Dead Letter Queue
中收到了消息,如下图。
Spring Boot 输出: