所以我正在使用 ActiveMQ 设置 Spring JMS,并且正在测试我的设置。
我正在尝试设置一个
RedeliveryPolicy
,因此它不会立即重试,但我注意到在我的日志记录 + ActiveMQ 代理中,它会立即重试,并且不使用我的 RedeliveryPolicy
bean。谁能指出我做错了什么?根据我在文档中找到的内容,这似乎是正确的。 (如果有人知道为什么我的 individualdeadletterstrategy
被忽略,但总是欢迎进行一般 DLQ)。
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public DeadLetterStrategy deadLetterStrategy() {
IndividualDeadLetterStrategy deadLetterStrategy = new IndividualDeadLetterStrategy();
deadLetterStrategy.setQueueSuffix(".dlq");
deadLetterStrategy.setUseQueueForQueueMessages(true);
return deadLetterStrategy;
}
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(5000);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(5);
return redeliveryPolicy;
}
@Bean
public Queue myQueue() {
return new ActiveMQQueue("myQueue");
}
deadLetterStrategy
涉及broker,所以你必须如下定义bean
看一下 http://activemq.apache.org/message-redelivery-and-dlq-handling.html
import java.util.ArrayList;
import java.util.List;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
import org.springframework.stereotype.Component;
@SpringBootApplication
@Configuration
@ComponentScan(excludeFilters = @ComponentScan.Filter(type = FilterType.ANNOTATION, classes = { Configuration.class,
Component.class }))
public class ActiveMQConfigurationDeadLetterStrategy {
public static void main(String[] args) {
ConfigurableApplicationContext app = SpringApplication.run(ActiveMQConfigurationDeadLetterStrategy.class, args);
}
@Bean
public BrokerService broker() throws Exception {
final BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616");
broker.addConnector("vm://localhost");
broker.setPersistent(false);
broker.setDestinationPolicy(policyMap());
return broker;
}
@Bean
public PolicyMap policyMap() {
PolicyMap destinationPolicy = new PolicyMap();
List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
PolicyEntry queueEntry = new PolicyEntry();
queueEntry.setQueue(">");
queueEntry.setDeadLetterStrategy(deadLetterStrategy());
PolicyEntry topicEntry = new PolicyEntry();
topicEntry.setTopic(">");
topicEntry.setDeadLetterStrategy(deadLetterStrategy());
entries.add(queueEntry);
entries.add(topicEntry);
destinationPolicy.setPolicyEntries(entries);
return destinationPolicy;
}
@Bean
public DeadLetterStrategy deadLetterStrategy() {
IndividualDeadLetterStrategy deadLetterStrategy = new IndividualDeadLetterStrategy();
deadLetterStrategy.setQueueSuffix(".dlq");
deadLetterStrategy.setUseQueueForQueueMessages(true);
return deadLetterStrategy;
}
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(5000);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(5);
return redeliveryPolicy;
}
@Bean
public ConnectionFactory jmsConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
return connectionFactory;
}
}
更新
如果您使用外部 AMQ,则 deadLetterStrategy 只能在 activemq.xml 配置文件的目标策略映射中设置 例如:
<broker>
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- Set the following policy on all queues using the '>' wildcard -->
<policyEntry queue=">">
<deadLetterStrategy>
<!--
Use the prefix 'DLQ.' for the destination name, and make
the DLQ a queue rather than a topic
-->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
import java.util.ArrayList;
import java.util.List;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
import org.springframework.stereotype.Component;
@SpringBootApplication
@Configuration
@ComponentScan(excludeFilters = @ComponentScan.Filter(type = FilterType.ANNOTATION, classes = { Configuration.class,
Component.class }))
public class ActiveMQConfigurationDeadLetterStrategy {
public static void main(String[] args) {
ConfigurableApplicationContext app = SpringApplication.run(ActiveMQConfigurationDeadLetterStrategy.class, args);
}
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(5000);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(5);
return redeliveryPolicy;
}
@Bean
public ConnectionFactory jmsConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
return connectionFactory;
}
}