我们正在使用Artemis 2.19.0并分发XML消息。最近,我们发现一些消息在发送到绑定了两个持久多播队列的多播地址时可能会丢失,这两个持久队列都有一个XPATH过滤器。
例如:
不知何故,IN.QUEUE1.FOO或IN.QUEUE2.FOO或两者最终都不会收到1000条消息。
我们已经厌倦了从其中一个队列中删除过滤器,然后一切正常,两个队列都将收到 1000 条消息。
所以,我的问题是:
有什么不清楚的地方请追问。 谢谢
更新1:
版本:
Java: 1.8
Spring-Integration: 5.5.11
Spring-jms: 5.3.19
Artemis: 2.19.0
XML 文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:int-xml="http://www.springframework.org/schema/integration/xml"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-4.3.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.3.xsd
http://www.springframework.org/schema/integration/xml
http://www.springframework.org/schema/integration/xml/spring-integration-xml-4.3.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file-4.3.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-4.3.xsd">
<!-- Multicast address -->
<beans:bean id="topic" class="org.apache.activemq.artemis.jms.client.ActiveMQTopic">
<beans:constructor-arg value="IN.ADDRESS.FOO"/>
</beans:bean>
<!-- Anycast queue -->
<beans:bean id="queue" class="org.apache.activemq.artemis.jms.client.ActiveMQQueue">
<beans:constructor-arg value="AQ.QUEUE.FOO"/>
</beans:bean>
<channel id="topicChannel">
</channel>
<task:executor id="executor" pool-size="2"/>
<publish-subscribe-channel id="outChannel" task-executor="executor"/>
<filter id="consumer1" input-channel="outChannel" output-channel="topicChannel" expression="payload.length() > 0"/>
<filter id="consumer2" input-channel="outChannel" output-channel="topicChannel" expression="payload.length() > 0"/>
<jms:message-driven-channel-adapter id="queueAdapter" destination="queue" channel="outChannel"
acknowledge="auto" connection-factory="ConnectionFactory"/>
<jms:outbound-channel-adapter id="topicAdapter" destination="topic" channel="topicChannel"
connection-factory="ConnectionFactory"/>
</beans:beans>
连接工厂 Bean:
@Bean(name = "ConnectionFactory")
public SingleConnectionFactory ibConnectionFactory(
@Value("${artemis.broker-url}") String brokerUrl,
@Value("${artemis.user}") String username,
@Value("${artemis.password}") String password) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL(brokerUrl);
factory.setUser(username);
factory.setPassword(password);
return new SingleConnectionFactory(factory);
}
发送程序:
try(ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(brokerUrl, user, password)) {
Connection conn = fac.createConnection();
Session session = conn.createSession();
MessageProducer producer = session.createProducer(new ActiveMQQueue("AQ.QUEUE.FOO"));
Message msg = session.createTextMessage("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Root xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>");
int count = 0;
while (count < 1000) {
System.out.println(count);
producer.send(msg);
count ++;
}
}
XPATH 过滤器:
XPATH '/Root/Data/PrimaryKey/Key/DetailedIdentity/ATCode[text()="AK"]'
消息示例:
<?xml version="1.0" encoding="UTF-8"?><Root xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>
删除发布订阅通道上的任务执行器属性或删除队列的过滤器之一可以解决问题。
更新2:
一个最小的例子,有 10 个并发任务总共发送 1000 条消息,如果同一地址下的所有队列都有 XPATH 过滤器,那么它不会收到 1000 条消息,但删除其中一个过滤器它可以工作。
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import javax.jms.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestRunner {
public static void main(String[] args) throws Exception {
String brokerUrl = "(tcp://server1:61616,tcp://server2:61616)?ha=true&reconnectAttempts=-1&retryInterval=100&retryIntervalMultiplier=1.5&maxRetryInterval=6000";
String user = "admin";
String password = "admin";
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(brokerUrl, user, password);
ExecutorService ser = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
ser.submit(() -> {
Connection conn = null;
try {
conn = fac.createConnection();
Session session = conn.createSession();
MessageProducer producer = session.createProducer(new ActiveMQTopic("IN.ADDRESS.FOO"));
Message msg = session.createTextMessage("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Root xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>");
int count = 0;
while (count < 100) {
System.out.println(count);
msg.setStringProperty("MessageId", String.valueOf(count));
producer.send(msg);
count++;
}
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
});
}
}
}
感谢您的测试用例。我能够用它来重现您所看到的错误。我打开了 ARTEMIS-4687 并发送了 PR 来解决该问题。该问题将在未来几周内发布的 2.33.0 版本中得到修复。