消费者无法使用队列中的消息 - ActiveMQ Artemis

问题描述 投票:0回答:1

我想使用 JMS 监听器使用消息,但是监听器无法使用消息。 这是我对 Artemis 嵌入式代理的配置。

@Component
@EnableJms
public class ArtemisConfig {
    private static final String TOPIC_NAME = "myTopic";

    @Bean
    public static ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory =
            new ActiveMQConnectionFactory("tcp://localhost:61616");
        return new CachingConnectionFactory(connectionFactory);
    }

    @Bean
    public void activeMQServer() throws Exception {
        Configuration configuration = new ConfigurationImpl();
        configuration.setPersistenceEnabled(false);
        configuration.setSecurityEnabled(false);

        Map<String, Object> params = new HashMap<>();
        params.put("host", "localhost");
        params.put("port", 61616);
        params.put("Schema", "tcp");
        TransportConfiguration transportConfiguration =
            new TransportConfiguration(
                NettyAcceptorFactory.class.getName(), params);

        configuration.setAcceptorConfigurations(
            Collections.singleton(transportConfiguration));

        ActiveMQServer server =
            ActiveMQServers.newActiveMQServer(configuration);

        CoreAddressConfiguration coreAddressConfiguration =
            new CoreAddressConfiguration();
        coreAddressConfiguration.setName(TOPIC_NAME)
            .addRoutingType(RoutingType.MULTICAST);
        configuration.addAddressConfiguration(coreAddressConfiguration);

        QueueConfiguration queueConfiguration =
            new QueueConfiguration(TOPIC_NAME);
        queueConfiguration.setAddress(TOPIC_NAME);
        queueConfiguration.setName(TOPIC_NAME);
        queueConfiguration.setRoutingType(RoutingType.MULTICAST);
        queueConfiguration.setDurable(true);
        configuration.addQueueConfiguration(queueConfiguration);

        server.start();
    }

    @Bean(name = "jmsTemplate")
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        return jmsTemplate;
    }

    @Bean
    public JmsListenerContainerFactory<?> myFactory(
        ConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(true);
        factory.setConcurrency("1-4");
        return factory;
    }
}

这是我的监听器,它消耗队列中的消息

@Component
public class ArtemisListener {

  @JmsListener(destination = "myTopic", containerFactory = "myFactory")
  public void listener(Message message){
    System.out.println("Message received");
  }
}

我正在运行一个测试,在日志中,我可以看到队列已创建,消息已发送到指定地址,并且消费者也已创建,但消息未消费。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {ArtemisConfig.class})
public class ArtemisIT {

  @Autowired
  JmsTemplate jmsTemplate;

  @Test
  public void checkIfListenerWorks() throws InterruptedException {
    jmsTemplate.convertAndSend("myTopic", "Hello");
    Thread.sleep(2000);
  }
}

我在日志中没有看到“已收到消息”。

AMQ601065: User anonymous is creating a queue on target resource: ServerSessionImpl(jms-session=*N/A*) [with parameters: [QueueConfiguration [id=null, name=730d9da3-3d8c-4042-8ef8-c51788f646a5, address=myTopic, routingType=MULTICAST, filterString=null, durable=false, user=null, maxConsumers=-1, exclusive=null, groupRebalance=null, groupBuckets=null, groupFirstKey=null, lastValue=null, lastValueKey=null, nonDestructive=null, purgeOnNoConsumers=false, consumersBeforeDispatch=null, delayBeforeDispatch=null, consumerPriority=null, autoDelete=null, autoDeleteDelay=null, autoDeleteMessageCount=null, ringSize=null, configurationManaged=null, temporary=true, autoCreateAddress=null, internal=null, transient=null, autoCreated=false]]]

AMQ601265: User anonymous is creating a core consumer on target resource ServerSessionImpl(jms-session=*N/A*) [with parameters: [0, 730d9da3-3d8c-4042-8ef8-c51788f646a5, null, 0, false, true, null]]

AMQ601500: User anonymous is sending a core message on target resource: ServerSessionImpl(jms-session=*N/A*) [with parameters: [TransactionImpl [xid=null, txID=13, xid=null, state=ACTIVE, createTime=1721642690458(Mon Jul 22 10:04:50 UTC 2024), timeoutSeconds=300, nr operations = 0]@78c18d2c, CoreMessage[messageID=0,durable=true,userID=d57b5c8f-4811-11ef-b49b-8045dd328039,priority=4, timestamp=Mon Jul 22 10:04:50 UTC 2024,expiration=0, durable=true, address=myTopic,size=480,properties=TypedProperties[__AMQ_CID=d5352bca-4811-11ef-b49b-8045dd328039,_AMQ_ROUTING_TYPE=1]]@1550453457, true, false, RoutingContextImpl(Address=null, routingType=null, PreviousAddress=null previousRoute:null, reusable=null, version=0)

我的配置需要进行任何更改吗?正如我在发送消息后立即看到服务器关闭日志一样。

java jms activemq-artemis
1个回答
0
投票

您的问题表明您正在从 JMS queue 进行消费,但在代码中您使用的是 JMS topic。 JMS 队列和主题之间的语义和配置有很大不同,因此这可能是问题的根源。

首先,如果您使用 JMS 主题,则无需配置 Core 队列。您只需要一个支持多播的地址。 本文档 详细解释了 JMS 概念如何映射到核心 API。事实上,默认情况下,代理将自动创建您需要的所有地址和队列,因此从技术上讲,您甚至不需要配置地址。此外,Core JMS 客户端不会使用您手动配置的核心队列。它将创建自己的非持久订阅队列,以遵守预期的 JMS 语义。 其次,由于 JMS 主题遵循发布/订阅语义,因此您

必须

在将消息发送到主题之前创建 JMS 主题订阅。您的描述中并不清楚您实际上正在这样做。

© www.soinside.com 2019 - 2024. All rights reserved.