[MessageListener.onMessage在带有Spring Boot的RabbitMQ上不断被调用

问题描述 投票:0回答:1

我有带线程睡眠的MessageListener.onMessage。我正在模拟onMessage的实际处理时间该方法将由上述线程睡眠执行。但是我注意到的是,对于其余消息,它被连续多次调用,直到它们被onMessage方法处理为止。我认为这是低效率的。

实际要排队的消息数:1000

输出匹配次数]

onMessage<<15656
onMessage<<15657
onMessage<<15658
onMessage<<15659
onMessage<<15660
onMessage<<15661
onMessage<<15662
onMessage<<15663

代码块

@Service
class ThreadPooledMessageListener implements MessageListener {
@Autowired
TaskExecutor threadPoolTaskExecutor;

AtomicInteger processedCount = new AtomicInteger();

@Override
public void onMessage(Message message) {
    System.out.println("onMessage<<" + processedCount.incrementAndGet());
    threadPoolTaskExecutor.execute(new MessageProcessor(message));

}
}

class MessageProcessor implements Runnable {
Message processingMessage;

public MessageProcessor(Message message) {
    this.processingMessage = message;
}

@Override
public void run() {
    System.out.println("================================"+ Thread.currentThread().getName());
    System.out.println(processingMessage);
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("================================");
}
}

可能的解决方法是什么?


正如@Gary Russell所指出的;问题是我在代码中使用了非弹簧托管容器SimpleMessageListenerContainer。使用spring托管bean修复了它,并在那里定义了并发性。可以正常工作。固定代码段

    @Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(queue);
    container.setMessageListener(threadPooledMessageListener);
    container.setConcurrentConsumers(4);
    container.start();
    return container;
}

我有带线程睡眠的MessageListener.onMessage。我正在模拟onMessage方法将由上述线程睡眠占用的实际处理时间。但是我注意到的是...

spring-boot rabbitmq spring-amqp
1个回答
1
投票

>I see this as an inefficiency.

© www.soinside.com 2019 - 2024. All rights reserved.