我们的应用程序使用RabbitMQ提供的几个队列中的数据。为了提高吞吐量,我们为每个队列启动多个线程,这些线程从这些队列中执行阻塞
对于新服务,我们希望使用Spring Boot,并且每个队列还有几个线程从这些队列中获取数据。以下是用于处理从某个队列到达的数据的规范Spring Boot代码:
@StreamListener(target = Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<SomeData> process(Message<SomeData> message) {
SomeData result = service.process(message.getPayload());
return MessageBuilder
.withPayload(result)
.copyHeaders(message.getHeaders())
.build();
}
现在的问题是如何使Spring Boot产生几个线程来服务一个队列而不是一个线程。吞吐量对我们的应用非常关键,因此需要这样做。
检查available properties,搜索rabbitmq。
spring.rabbitmq.listener.simple.concurrency =#侦听器调用者线程的最小数量
看起来很有希望
您可以在配置队列时为队列设置并发使用者。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory
(MessageConverter contentTypeConverter,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// the number of consumers is set as 5
factory.setConcurrentConsumers(5);
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(contentTypeConverter);
return factory;
}