我有带线程睡眠的MessageListener.onMessage。我正在模拟onMessage的实际处理时间该方法将由上述线程睡眠执行。但是我注意到的是,对于其余消息,它被连续多次调用,直到它们被onMessage方法处理为止。我认为这是低效率的。
实际要排队的消息数:1000
输出匹配次数]
onMessage<<15656 onMessage<<15657 onMessage<<15658 onMessage<<15659 onMessage<<15660 onMessage<<15661 onMessage<<15662 onMessage<<15663
代码块
@Service class ThreadPooledMessageListener implements MessageListener { @Autowired TaskExecutor threadPoolTaskExecutor; AtomicInteger processedCount = new AtomicInteger(); @Override public void onMessage(Message message) { System.out.println("onMessage<<" + processedCount.incrementAndGet()); threadPoolTaskExecutor.execute(new MessageProcessor(message)); } } class MessageProcessor implements Runnable { Message processingMessage; public MessageProcessor(Message message) { this.processingMessage = message; } @Override public void run() { System.out.println("================================"+ Thread.currentThread().getName()); System.out.println(processingMessage); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("================================"); } }
可能的解决方法是什么?
正如@Gary Russell所指出的;问题是我在代码中使用了非弹簧托管容器SimpleMessageListenerContainer。使用spring托管bean修复了它,并在那里定义了并发性。可以正常工作。固定代码段
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue);
container.setMessageListener(threadPooledMessageListener);
container.setConcurrentConsumers(4);
container.start();
return container;
}
我有带线程睡眠的MessageListener.onMessage。我正在模拟onMessage方法将由上述线程睡眠占用的实际处理时间。但是我注意到的是...
>I see this as an inefficiency.