我正在尝试提高从 RabbitMQ 队列消费消息的效率。
我无法让我的 Spring 配置与我配置了并发和预取计数的 RabbitMQ SimpleMessageListenerContainer 一起使用。
Spring Boot 3.2.5
春云2023.0.1
春云流
spring-cloud-stream-binder-rabbit
Java 21
RabbitMQ 服务器运行版本 3.6.12
spring:
application:
name: app-third
rabbitmq:
host: ${RABBITMQ_URL:localhost}
port: ${RABBITMQ_PORT:5672}
virtual-host: ${RABBITMQ_VIRTUALHOST:}
username: ${RABBITMQ_USER:guest}
password: ${RABBITMQ_PASS:guest}
listener:
simple:
retry:
enabled: true
max-attempts: 3
concurrency: 5
max-concurrency: 25
prefetch: 10
cloud:
function:
definition: process
stream:
bindings:
process-in-0:
destination: app.third-request
group: app-third
process-out-0:
destination: app.third-response
group: app-third
threads:
virtual:
enabled: true
我的自定义任务执行器 Bean
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(200);
executor.setQueueCapacity(400);
executor.initialize();
return executor;
}
仅当我创建
SimpleMessageListenerContainer
的自定义 Bean 时,我才能使其正常工作。@Bean
public SimpleMessageListenerContainer messageListenerContainer(
ConnectionFactory connectionFactory,
@Qualifier("taskExecutor") TaskExecutor taskExecutor,
MessageListenerAdapter listenerAdapter
) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("app.third-request.app-third");
container.setTaskExecutor(taskExecutor);
container.setConcurrentConsumers(5);
container.setMaxConcurrentConsumers(50);
container.setPrefetchCount(10);
return container;
}
但是,这不适用于我的消息队列功能
日志输出:
[ main] o.s.a.r.l.SimpleMessageListenerContainer : Changing consumers from 1 to 1
[ main] o.s.a.r.l.SimpleMessageListenerContainer : No global properties bean
[ main] o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
[ app-third-1] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@13ddaffb: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ app-third-1] o.s.a.r.listener.BlockingQueueConsumer : Started on queue 'app.third-request.app-third' with tag amq.ctag-zP9jnSC-NwTYivlWOLf2kw:
Consumer@13ddaffb: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/rabbitmq-instance,1),
conn: Proxy@51172948 Shared Rabbit Connection: SimpleConnection@fe13916 [delegate=amqp://[email protected]:5672/rabbitmq-instance, localPort=55370], acknowledgeMode=AUTO local queue size=0
[ main] o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
[ taskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@1ee22768: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-5] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@38a52072: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@746f8520: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-2] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@6108fd23: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-4] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@3d3a28b5: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-5] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/rabbitmq-instance,3)
[ taskExecutor-3] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/rabbitmq-instance,4)
[ taskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/rabbitmq-instance,2)
[ taskExecutor-4] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/rabbitmq-instance,6)
[ taskExecutor-2] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/rabbitmq-instance,5)
[ taskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Started on queue 'app.third-request.app-third' with tag amq.ctag-HrAsdYteFh7S8s2KPZsrDg:
Consumer@1ee22768: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/rabbitmq-instance,2),
conn: Proxy@51172948 Shared Rabbit Connection: SimpleConnection@fe13916 [delegate=amqp://[email protected]:5672/rabbitmq-instance, localPort=55370], acknowledgeMode=AUTO local queue size=0
4 More Started on queue
如何从 Spring 配置中获取 SimpleMessageListenerContainer 以按照并发和预取计数配置工作?
Spring 和/或 RabbitMQ 似乎没有使用我的侦听器配置。
我创建了一个示例应用程序来演示这个问题。
https://github.com/DJViking/spring-boot-rabbitmq-demo
运行这个 Spring Boot 示例将启动一个 RabbitMQ 实例并配置一个类似于我的应用程序的队列。
当将 Spring Cloud Stream 与 函数式编程模型 一起使用时,我必须使用
spring.cloud.stream.binders
配置属性,而不是 spring.rabbitmq.listener
属性
spring:
cloud:
function:
definition: processPayment
stream:
bindings:
processPayment-in-0:
destination: demo.payment-request
group: demo-payment
processPayment-out-0:
destination: demo.payment-response
group: demo-payment
rabbit:
bindings:
processPayment-in-0:
consumer:
prefetch: 10
single-active-consumer: false
max-concurrency: 10
container-type: simple