我有一个(WildFly 33 上的 Java 21/EE10)容器,它使用 JMS 2.0(因此是 ESB 高级层)连接到 Azure 服务总线。 一切都好。
但是,当重新部署(甚至重新启动)POD 时,激活监听的 @Startup 函数会失败,并出现以下情况: 客户消费者在设置服务总线时出错:jakarta.jms.JMSRuntimeException:无法在订阅者 cmsdev-customer-sub$ECC_GIA$D 上创建新消费者,因为已经有一个订阅者与同一活动消费者使用相同的名称或不同的主题。参考:a55df62a-5bea-43c2-880a-f32509828982, TrackingId:909c3640-65a8-40f5-8440-d53d6c6ba766_B60,SystemTracker:gi :: G12:97974857:cmsdev-customer-sub,
当我设置一个 clientId(以保持服务总线提供的 ACIDity)相同时,它失败的感觉是正确的,因为集群确保新的修订版在拆除前一个修订版之前启动。
这是第 22 条军规的一部分。
我们真的很想保留 ACA 提供的零停机部署。
任何有见解并愿意提供建议的人将受到高度赞赏。
谢谢
格里
我创建了一个示例 Java 应用程序,并能够使用 JMS 成功侦听来自 Azure 服务总线主题的消息,这会使用客户端 ID 创建一个新订阅。
为了避免出现问题,我为每个实例创建了一个唯一的客户端 ID,以便每个容器实例都有自己的客户端 ID,即使在重新部署期间也是如此。
我已将以下行添加到我的
Main.java
文件中
String clientId = "client-" + UUID.randomUUID().toString();
Main.java:
import javax.jms.*;
import com.microsoft.azure.servicebus.jms.ServiceBusJmsConnectionFactory;
import com.microsoft.azure.servicebus.jms.ServiceBusJmsConnectionFactorySettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
public class Main {
private static final Logger logger = LoggerFactory.getLogger(Main.class);
private static final String CONNECTION_STRING = "<TopicConneString>";
private static final String TOPIC_NAME = "<topicName>";
private static final String SUBSCRIPTION_NAME = "<subscriptionName>";
private Connection connection;
private Session session;
private MessageConsumer consumer;
public static void main(String[] args) {
Main serviceBusApp = new Main();
serviceBusApp.run();
}
public void run() {
try {
initializeConnection();
createSubscriptionIfNeeded(SUBSCRIPTION_NAME);
sendMessage("Hello from Azure Service Bus!");
receiveMessages(SUBSCRIPTION_NAME);
} catch (JMSException e) {
logger.error("Error during Service Bus operation: {}", e.getMessage());
} finally {
closeResources();
}
}
private void initializeConnection() throws JMSException {
try {
String clientId = "client-" + UUID.randomUUID().toString();
ServiceBusJmsConnectionFactory factory = new ServiceBusJmsConnectionFactory(
CONNECTION_STRING, new ServiceBusJmsConnectionFactorySettings()
);
connection = factory.createConnection();
connection.setClientID(clientId);
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
connection.start();
logger.info("Successfully connected to Azure Service Bus with clientId: {}", clientId);
} catch (JMSException e) {
logger.error("Error initializing connection: {}", e.getMessage());
throw e;
}
}
private void createSubscriptionIfNeeded(String subscriptionName) {
try {
logger.info("Checking if subscription exists...");
logger.info("Subscription '{}' verified/created.", subscriptionName);
} catch (Exception e) {
logger.error("Error creating or verifying subscription: {}", e.getMessage());
}
}
public void sendMessage(String messageContent) {
try {
Destination destination = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(messageContent);
producer.send(message);
logger.info("Sent a single message to the topic: {}", TOPIC_NAME);
producer.close();
} catch (JMSException e) {
logger.error("Error sending message: {}", e.getMessage());
}
}
public void receiveMessages(String subscriptionName) {
try {
Destination destination = session.createTopic(TOPIC_NAME);
consumer = session.createDurableSubscriber((Topic) destination, subscriptionName);
consumer.setMessageListener(message -> {
try {
if (message instanceof TextMessage) {
String messageBody = ((TextMessage) message).getText();
logger.info("Received: {}", messageBody);
message.acknowledge();
}
} catch (JMSException e) {
logger.error("Error processing message: {}", e.getMessage());
}
});
logger.info("Waiting for messages... Press enter to stop.");
while (true) {
if (System.in.available() > 0) break;
}
} catch (Exception e) {
logger.error("Error receiving messages: {}", e.getMessage());
}
}
private void closeResources() {
try {
if (consumer != null) consumer.close();
if (session != null) session.close();
if (connection != null) connection.close();
logger.info("Closed connection and resources.");
} catch (JMSException e) {
logger.error("Error closing resources: {}", e.getMessage());
}
}
}
pom.xml:
<dependencies>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.12.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus-jms</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.32</version>
<scope>runtime</scope>
</dependency>
</dependencies>
Dockerfile:
FROM openjdk:21-jdk-slim as build
WORKDIR /app
COPY target/example-1.0-SNAPSHOT.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "app.jar"]
输出:
我向 Azure 服务总线主题发送和接收消息。
使用客户端 ID 新创建的订阅在 Azure 服务总线主题中发送和接收的消息如下所示。
Azure 容器应用程序:
我在 Azure 容器应用程序中收到了以下发送和接收消息的日志,如下所示。