我想使用 JMS 监听器使用消息,但是监听器无法使用消息。 这是我对 Artemis 嵌入式代理的配置。
@Component
@EnableJms
public class ArtemisConfig {
private static final String TOPIC_NAME = "myTopic";
@Bean
public static ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://localhost:61616");
return new CachingConnectionFactory(connectionFactory);
}
@Bean
public void activeMQServer() throws Exception {
Configuration configuration = new ConfigurationImpl();
configuration.setPersistenceEnabled(false);
configuration.setSecurityEnabled(false);
Map<String, Object> params = new HashMap<>();
params.put("host", "localhost");
params.put("port", 61616);
params.put("Schema", "tcp");
TransportConfiguration transportConfiguration =
new TransportConfiguration(
NettyAcceptorFactory.class.getName(), params);
configuration.setAcceptorConfigurations(
Collections.singleton(transportConfiguration));
ActiveMQServer server =
ActiveMQServers.newActiveMQServer(configuration);
CoreAddressConfiguration coreAddressConfiguration =
new CoreAddressConfiguration();
coreAddressConfiguration.setName(TOPIC_NAME)
.addRoutingType(RoutingType.MULTICAST);
configuration.addAddressConfiguration(coreAddressConfiguration);
QueueConfiguration queueConfiguration =
new QueueConfiguration(TOPIC_NAME);
queueConfiguration.setAddress(TOPIC_NAME);
queueConfiguration.setName(TOPIC_NAME);
queueConfiguration.setRoutingType(RoutingType.MULTICAST);
queueConfiguration.setDurable(true);
configuration.addQueueConfiguration(queueConfiguration);
server.start();
}
@Bean(name = "jmsTemplate")
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
return jmsTemplate;
}
@Bean
public JmsListenerContainerFactory<?> myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(true);
factory.setConcurrency("1-4");
return factory;
}
}
这是我的监听器,它消耗队列中的消息
@Component
public class ArtemisListener {
@JmsListener(destination = "myTopic", containerFactory = "myFactory")
public void listener(Message message){
System.out.println("Message received");
}
}
我正在运行一个测试,在日志中,我可以看到队列已创建,消息已发送到指定地址,并且消费者也已创建,但消息未消费。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {ArtemisConfig.class})
public class ArtemisIT {
@Autowired
JmsTemplate jmsTemplate;
@Test
public void checkIfListenerWorks() throws InterruptedException {
jmsTemplate.convertAndSend("myTopic", "Hello");
Thread.sleep(2000);
}
}
我在日志中没有看到“已收到消息”。
AMQ601065: User anonymous is creating a queue on target resource: ServerSessionImpl(jms-session=*N/A*) [with parameters: [QueueConfiguration [id=null, name=730d9da3-3d8c-4042-8ef8-c51788f646a5, address=myTopic, routingType=MULTICAST, filterString=null, durable=false, user=null, maxConsumers=-1, exclusive=null, groupRebalance=null, groupBuckets=null, groupFirstKey=null, lastValue=null, lastValueKey=null, nonDestructive=null, purgeOnNoConsumers=false, consumersBeforeDispatch=null, delayBeforeDispatch=null, consumerPriority=null, autoDelete=null, autoDeleteDelay=null, autoDeleteMessageCount=null, ringSize=null, configurationManaged=null, temporary=true, autoCreateAddress=null, internal=null, transient=null, autoCreated=false]]]
AMQ601265: User anonymous is creating a core consumer on target resource ServerSessionImpl(jms-session=*N/A*) [with parameters: [0, 730d9da3-3d8c-4042-8ef8-c51788f646a5, null, 0, false, true, null]]
AMQ601500: User anonymous is sending a core message on target resource: ServerSessionImpl(jms-session=*N/A*) [with parameters: [TransactionImpl [xid=null, txID=13, xid=null, state=ACTIVE, createTime=1721642690458(Mon Jul 22 10:04:50 UTC 2024), timeoutSeconds=300, nr operations = 0]@78c18d2c, CoreMessage[messageID=0,durable=true,userID=d57b5c8f-4811-11ef-b49b-8045dd328039,priority=4, timestamp=Mon Jul 22 10:04:50 UTC 2024,expiration=0, durable=true, address=myTopic,size=480,properties=TypedProperties[__AMQ_CID=d5352bca-4811-11ef-b49b-8045dd328039,_AMQ_ROUTING_TYPE=1]]@1550453457, true, false, RoutingContextImpl(Address=null, routingType=null, PreviousAddress=null previousRoute:null, reusable=null, version=0)
我的配置需要进行任何更改吗?正如我在发送消息后立即看到服务器关闭日志一样。
您的问题表明您正在从 JMS queue 进行消费,但在代码中您使用的是 JMS topic。 JMS 队列和主题之间的语义和配置有很大不同,因此这可能是问题的根源。
首先,如果您使用 JMS 主题,则无需配置 Core 队列。您只需要一个支持多播的地址。 本文档 详细解释了 JMS 概念如何映射到核心 API。事实上,默认情况下,代理将自动创建您需要的所有地址和队列,因此从技术上讲,您甚至不需要配置地址。此外,Core JMS 客户端不会使用您手动配置的核心队列。它将创建自己的非持久订阅队列,以遵守预期的 JMS 语义。 其次,由于 JMS 主题遵循发布/订阅语义,因此您
必须在将消息发送到主题之前创建 JMS 主题订阅。您的描述中并不清楚您实际上正在这样做。