ActiveMQ - 主题的重新传递策略和死信队列

问题描述 投票:0回答:1

我正在使用 ActiveMQ Artemis 2.17 和 Spring Boot 2.5.7。 我正在发布有关主题和队列的消息并使用它。一切都是通过 JMS 完成的。 所有队列(任播或多播)都是持久的。我的主题(多播地址)有 2 个持久队列,以便有 2 个独立的消费者。 对于我的主题,两个消费者使用持久和共享订阅(JMS 2.0)。 所有处理都是事务性的,通过 Atomikos 事务管理器进行管理(我需要它来与数据库进行两阶段提交)。

我对再次投递政策和 DLQ 有疑问。当我在处理消息期间引发异常时,重新传递策略会正确应用于队列(任播队列),并且会使用该消息创建 DLQ。但是,对于主题(多播队列),重新传递策略不适用,并且消息不会发送到 DLQ。

这是我的 ActiveMQ Artemis 代理配置:

<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

    <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="urn:activemq:core ">

        <name>0.0.0.0</name>


        <!-- Codec and key used to encode the passwords -->
        <!-- TODO : set master-password configuration into the Java code -->
        <password-codec>org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;key=UBNTd0dS9w6f8HDIyGW9
        </password-codec>


        <!-- Configure the persistence into a database (postgresql) -->
        <persistence-enabled>true</persistence-enabled>
        <store>
            <database-store>
                <bindings-table-name>BINDINGS_TABLE</bindings-table-name>
                <message-table-name>MESSAGE_TABLE</message-table-name>
                <page-store-table-name>PAGE_TABLE</page-store-table-name>
                <large-message-table-name>LARGE_MESSAGES_TABLE</large-message-table-name>
                <node-manager-store-table-name>NODE_MANAGER_TABLE</node-manager-store-table-name>
                <jdbc-lock-renew-period>2000</jdbc-lock-renew-period>
                <jdbc-lock-expiration>20000</jdbc-lock-expiration>
                <jdbc-journal-sync-period>5</jdbc-journal-sync-period>
                <!-- Configure a connection pool -->
                <data-source-properties>
                    <data-source-property key="driverClassName" value="org.postgresql.Driver"/>
                    <data-source-property key="url" value="jdbc:postgresql://localhost/artemis"/>
                    <data-source-property key="username" value="postgres"/>
                    <data-source-property key="password" value="ENC(-3eddbe9664c85ec7ed63588b000486cb)"/>
                    <data-source-property key="poolPreparedStatements" value="true"/>
                    <data-source-property key="initialSize" value="2"/>
                    <data-source-property key="minIdle" value="1"/>
                </data-source-properties>
            </database-store>
        </store>


        <!-- Configure the addresses, queues and topics default behaviour -->
        <!-- See: https://activemq.apache.org/components/artemis/documentation/latest/address-model.html -->
        <address-settings>
            <address-setting match="#">
                <dead-letter-address>DLA</dead-letter-address>
                <auto-create-dead-letter-resources>true</auto-create-dead-letter-resources>
                <dead-letter-queue-prefix/>
                <dead-letter-queue-suffix>.DLQ</dead-letter-queue-suffix>
                <expiry-address>ExpiryQueue</expiry-address>
                <auto-create-expiry-resources>false</auto-create-expiry-resources>
                <expiry-queue-prefix/>
                <expiry-queue-suffix>.EXP</expiry-queue-suffix>
                <expiry-delay>-1</expiry-delay>
                <max-delivery-attempts>5</max-delivery-attempts>
                <redelivery-delay>250</redelivery-delay>
                <redelivery-delay-multiplier>2.0</redelivery-delay-multiplier>
                <redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
                <max-redelivery-delay>10000</max-redelivery-delay>
                <max-size-bytes>100000</max-size-bytes>
                <page-size-bytes>20000</page-size-bytes>
                <page-max-cache-size>5</page-max-cache-size>
                <max-size-bytes-reject-threshold>-1</max-size-bytes-reject-threshold>
                <address-full-policy>PAGE</address-full-policy>
                <message-counter-history-day-limit>0</message-counter-history-day-limit>
                <default-last-value-queue>false</default-last-value-queue>
                <default-non-destructive>false</default-non-destructive>
                <default-exclusive-queue>false</default-exclusive-queue>
                <default-consumers-before-dispatch>0</default-consumers-before-dispatch>
                <default-delay-before-dispatch>-1</default-delay-before-dispatch>
                <redistribution-delay>0</redistribution-delay>
                <send-to-dla-on-no-route>true</send-to-dla-on-no-route>
                <slow-consumer-threshold>-1</slow-consumer-threshold>
                <slow-consumer-policy>NOTIFY</slow-consumer-policy>
                <slow-consumer-check-period>5</slow-consumer-check-period>
                <!-- We disable the automatic creation of queue or topic -->
                <auto-create-queues>false</auto-create-queues>
                <auto-delete-queues>true</auto-delete-queues>
                <auto-delete-created-queues>false</auto-delete-created-queues>
                <auto-delete-queues-delay>30000</auto-delete-queues-delay>
                <auto-delete-queues-message-count>0</auto-delete-queues-message-count>
                <config-delete-queues>OFF</config-delete-queues>
                <!-- We disable the automatic creation of address -->
                <auto-create-addresses>false</auto-create-addresses>
                <auto-delete-addresses>true</auto-delete-addresses>
                <auto-delete-addresses-delay>30000</auto-delete-addresses-delay>
                <config-delete-addresses>OFF</config-delete-addresses>
                <management-browse-page-size>200</management-browse-page-size>
                <default-purge-on-no-consumers>false</default-purge-on-no-consumers>
                <default-max-consumers>-1</default-max-consumers>
                <default-queue-routing-type>ANYCAST</default-queue-routing-type>
                <default-address-routing-type>ANYCAST</default-address-routing-type>
                <default-ring-size>-1</default-ring-size>
                <retroactive-message-count>0</retroactive-message-count>
                <enable-metrics>true</enable-metrics>
                <!-- We automatically force group rebalance and a dispatch pause during group rebalance -->
                <default-group-rebalance>true</default-group-rebalance>
                <default-group-rebalance-pause-dispatch>true</default-group-rebalance-pause-dispatch>
                <default-group-buckets>1024</default-group-buckets>
            </address-setting>
        </address-settings>


        <!-- Define the protocols accepted -->
        <!-- See: https://activemq.apache.org/components/artemis/documentation/latest/protocols-interoperability.html -->
        <acceptors>
            <!-- Acceptor for only CORE protocol -->
            <!-- We enable the cache destination as recommended into the documentation. See: https://activemq.apache.org/components/artemis/documentation/latest/using-jms.html -->
            <acceptor name="artemis">
                tcp://0.0.0.0:61616?protocols=CORE,tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;useEpoll=true,cacheDestinations=true
            </acceptor>
        </acceptors>


        <!-- Define how to connect to another broker -->
        <!-- TODO : est-ce utile ? -->
        <connectors>
            <connector name="netty-connector">tcp://localhost:61616</connector>
            <connector name="broker1-connector">tcp://localhost:61616</connector>
            <connector name="broker2-connector">tcp://localhost:61617</connector>
        </connectors>

        <!-- Configure the High-Availability and broker cluster for the high-availability -->
        <ha-policy>
            <shared-store>
                <master>
                    <failover-on-shutdown>true</failover-on-shutdown>
                </master>
            </shared-store>
        </ha-policy>

        <cluster-connections>
            <cluster-connection name="gerico-cluster">
                <connector-ref>netty-connector</connector-ref>
                <static-connectors>
                    <connector-ref>broker1-connector</connector-ref>
                    <connector-ref>broker2-connector</connector-ref>
                </static-connectors>
            </cluster-connection>
        </cluster-connections>

        <!--       <cluster-user>cluster_user</cluster-user>-->
        <!--       <cluster-password>cluster_user_password</cluster-password>-->

        <!-- should the broker detect dead locks and other issues -->
        <critical-analyzer>true</critical-analyzer>

        <critical-analyzer-timeout>120000</critical-analyzer-timeout>

        <critical-analyzer-check-period>60000</critical-analyzer-check-period>

        <critical-analyzer-policy>HALT</critical-analyzer-policy>


        <page-sync-timeout>72000</page-sync-timeout>


        <!-- the system will enter into page mode once you hit this limit.
       This is an estimate in bytes of how much the messages are using in memory

        The system will use half of the available memory (-Xmx) by default for the global-max-size.
        You may specify a different value here if you need to customize it to your needs.

        <global-max-size>100Mb</global-max-size>

  -->

        <!-- Security configuration -->
        <security-enabled>false</security-enabled>

        <!-- Addresses and queues configuration -->
        <!-- !!! DON'T FORGET TO UPDATE 'slave-broker.xml' FILE !!! -->
        <addresses>
            <address name="topic.test_rde">
                <multicast>
                    <queue name="rde_receiver_1">
                        <durable>true</durable>
                    </queue>
                    <queue name="rde_receiver_2">
                        <durable>true</durable>
                    </queue>
                </multicast>
            </address>
            <address name="queue.test_rde">
                <anycast>
                    <queue name="queue.test_rde">
                        <durable>true</durable>
                    </queue>
                </anycast>
            </address>
        </addresses>
    </core>
</configuration>

Spring Boot 中的 JMS 配置如下:

@Bean
public DynamicDestinationResolver destinationResolver() {
    return new DynamicDestinationResolver() {
        @Override
        public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
            if (destinationName.startsWith("topic.")) {
                pubSubDomain = true;
            } else {
                pubSubDomain = false;
            }
            
            return super.resolveDestinationName(session, destinationName, pubSubDomain);
        }
    };
}

@Bean
public JmsListenerContainerFactory<?> queueConnectionFactory(ConnectionFactory connectionFactory,
                                                             DefaultJmsListenerContainerFactoryConfigurer configurer,
                                                             JmsErrorHandler jmsErrorHandler) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setPubSubDomain(false);
    factory.setSessionTransacted(true);
    factory.setErrorHandler(jmsErrorHandler);

    return factory;
}

@Bean
public JmsListenerContainerFactory<?> topicConnectionFactory(ConnectionFactory connectionFactory,
                                                             DefaultJmsListenerContainerFactoryConfigurer configurer,
                                                             JmsErrorHandler jmsErrorHandler) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    // This provides all boot's default to this factory, including the message converter
    configurer.configure(factory, connectionFactory);
    factory.setPubSubDomain(true);
    factory.setSessionTransacted(true);
    factory.setSubscriptionDurable(true);
    factory.setSubscriptionShared(true);
    factory.setErrorHandler(jmsErrorHandler);

    return factory;
}

消息发布者:

@GetMapping("api/send")
public void sendDataToJms() throws InterruptedException {
    OrderRest currentService = this.applicationContext.getBean(OrderRest.class);
    for (Long i = 0L; i < 1L; i++) {
        currentService.sendTopicMessage(i);
        currentService.sendQueueMessage(i);
        Thread.sleep(200L);
    }
}

@Transactional
public void sendTopicMessage(Long id) {
    Order myMessage = new Order("--" + id.toString() + "--", new Date());
    jmsTemplate.convertAndSend("topic.test_rde", myMessage);
}

@Transactional
public void sendQueueMessage(Long id) {
    Order myMessage = new Order("--" + id.toString() + "--", new Date());
    jmsTemplate.convertAndSend("queue.test_rde", myMessage);
}

听众:

@Transactional
@JmsListener(destination = "topic.test_rde", containerFactory = "topicConnectionFactory", subscription = "rde_receiver_1")
public void receiveMessage_rde_1(@Payload Order order, @Headers MessageHeaders headers, Message message, Session session) {
    log.info("---> Consumer1 - rde_receiver_1 - " + order.getContent());
    throw new ValidationException("Voluntary exception", "entity", List.of(), List.of());
}

@Transactional
@JmsListener(destination = "queue.test_rde", containerFactory = "queueConnectionFactory")
public void receiveMessage_rde_queue(@Payload Order order, @Headers MessageHeaders headers, Message message, Session session) {
    log.info("---> Consumer1 - rde_receiver_queue - " + order.getContent());
    throw new ValidationException("Voluntary exception", "entity", List.of(), List.of());
}

重投策略和DLQ机制只适用于队列(anycat队列),这正常吗?
是否也可以将其应用于主题(多播队列)和共享持久订阅?
如果没有,我怎样才能拥有主题行为,但具有重新传递和 DLQ 机制?我应该使用ActiveMQ的divert方案吗?

java jms spring-jms activemq-artemis atomikos
1个回答
0
投票

我发现问题了。它来自 Atomikos,不支持 JMS v2.0,仅支持 JMS 1.1。 因此,不可能获得同时支持 2-PC 和重新交付策略的主题的共享持久订阅。

© www.soinside.com 2019 - 2024. All rights reserved.