Spring-Rabbitmq SimpleRabbitListenerContainerFactory 是使用 CustomThreadPoolExecutor 定义的。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(final ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter());
factory.setAfterReceivePostProcessors(new RabbitMQConsumerInterceptor());
factory.setBeforeSendReplyPostProcessors(new RabbitMQProducerInterceptor());
factory.setTaskExecutor(
new CustomThreadPoolExecutor(
20, 30, 60, SECONDS, new LinkedBlockingQueue<>()));
return factory;
}
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println(":afterExecute"); // Thi method never executed
}
}
由于某些奇怪的原因,afterExecute 方法没有被调用。有人可以解释一下吗?
当您停止侦听器容器时会调用它。
这个执行器在
SimpleMessageListenerContainer
中使用如下:
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
getTaskExecutor().execute(processor);
那个
processor
有这个:
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}
因此,本质上,我们永远不会在良好的情况下离开该循环。