Spring Boot RabbitMQ 配置的并发不起作用

问题描述 投票:0回答:1

我正在尝试提高从 RabbitMQ 队列消费消息的效率。

我无法让我的 Spring 配置与我配置了并发和预取计数的 RabbitMQ SimpleMessageListenerContainer 一起使用。

Spring Boot 3.2.5
春云2023.0.1
春云流
spring-cloud-stream-binder-rabbit
Java 21
RabbitMQ 服务器运行版本 3.6.12

spring:
  application:
    name: app-third
  rabbitmq:
    host: ${RABBITMQ_URL:localhost}
    port: ${RABBITMQ_PORT:5672}
    virtual-host: ${RABBITMQ_VIRTUALHOST:}
    username: ${RABBITMQ_USER:guest}
    password: ${RABBITMQ_PASS:guest}
    listener:
      simple:
        retry:
          enabled: true
          max-attempts: 3
        concurrency: 5
        max-concurrency: 25
        prefetch: 10
  cloud:
    function:
      definition: process
    stream:
      bindings:
        process-in-0:
          destination: app.third-request
          group: app-third
        process-out-0:
          destination: app.third-response
          group: app-third
  threads:
    virtual:
      enabled: true

我的自定义任务执行器 Bean

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(20);
    executor.setMaxPoolSize(200);
    executor.setQueueCapacity(400);
    executor.initialize();
    return executor;
}

仅当我创建

SimpleMessageListenerContainer
的自定义 Bean 时,我才能使其正常工作。
RabbitMQ 服务器现在有 6 个消费者(应该是 5 个)。
其中 5 个消费者的预取计数为 10,而 1 个消费者的预取计数为 1。
我怀疑 SimpleMessageListenerContainer 它是来自 Spring 配置的容器,以及我使用自定义 Bean 创建的容器。

@Bean
public SimpleMessageListenerContainer messageListenerContainer(
    ConnectionFactory connectionFactory,
    @Qualifier("taskExecutor") TaskExecutor taskExecutor,
    MessageListenerAdapter listenerAdapter
) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("app.third-request.app-third");
    container.setTaskExecutor(taskExecutor);
    container.setConcurrentConsumers(5);
    container.setMaxConcurrentConsumers(50);
    container.setPrefetchCount(10);
    return container;
}

但是,这不适用于我的消息队列功能。我需要创建一个 MessageListenerAdapter 来读取消息。

日志输出:

[           main] o.s.a.r.l.SimpleMessageListenerContainer : Changing consumers from 1 to 1
[           main] o.s.a.r.l.SimpleMessageListenerContainer : No global properties bean
[           main] o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
[    app-third-1] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@13ddaffb: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[    app-third-1] o.s.a.r.listener.BlockingQueueConsumer   : Started on queue 'app.third-request.app-third' with tag amq.ctag-zP9jnSC-NwTYivlWOLf2kw: 
  Consumer@13ddaffb: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/rabbitmq-instance,1), 
  conn: Proxy@51172948 Shared Rabbit Connection: SimpleConnection@fe13916 [delegate=amqp://[email protected]:5672/rabbitmq-instance, localPort=55370], acknowledgeMode=AUTO local queue size=0

[           main] o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
[ taskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@1ee22768: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-5] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@38a52072: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@746f8520: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-2] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@6108fd23: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-4] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@3d3a28b5: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-5] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/rabbitmq-instance,3)
[ taskExecutor-3] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/rabbitmq-instance,4)
[ taskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/rabbitmq-instance,2)
[ taskExecutor-4] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/rabbitmq-instance,6)
[ taskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/rabbitmq-instance,5)
[ taskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Started on queue 'app.third-request.app-third' with tag amq.ctag-HrAsdYteFh7S8s2KPZsrDg: 
    Consumer@1ee22768: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/rabbitmq-instance,2), 
    conn: Proxy@51172948 Shared Rabbit Connection: SimpleConnection@fe13916 [delegate=amqp://[email protected]:5672/rabbitmq-instance, localPort=55370], acknowledgeMode=AUTO local queue size=0
4 More Started on queue

如何从 Spring 配置中获取 SimpleMessageListenerContainer 以按照并发和预取计数配置工作?

Spring 和/或 RabbitMQ 似乎没有使用我的侦听器配置。

我创建了一个示例应用程序来演示这个问题。
https://github.com/DJViking/spring-boot-rabbitmq-demo 运行这个 Spring Boot 示例将启动一个 RabbitMQ 实例并配置一个类似于我的应用程序的队列。

java spring-boot concurrency rabbitmq spring-amqp
1个回答
0
投票

当将 Spring Cloud Stream 与 函数式编程模型 一起使用时,我必须使用

spring.cloud.stream.binders
配置属性,而不是
spring.rabbitmq.listener
属性

spring:
  cloud:
    function:
      definition: processPayment
    stream:
      bindings:
        processPayment-in-0:
          destination: demo.payment-request
          group: demo-payment
        processPayment-out-0:
          destination: demo.payment-response
          group: demo-payment
      rabbit:
        bindings:
          processPayment-in-0:
            consumer:
              prefetch: 10
              single-active-consumer: false
              max-concurrency: 10
              container-type: simple
© www.soinside.com 2019 - 2024. All rights reserved.