我正在使用执行程序服务来确保传入的 jms 消息在将它们写入数据库后得到确认 - (使用 XA 数据源和分布式事务不是我们现在的选择)。为了实现这一目标,我的流程在收到消息时写入数据库,然后使用 executorservice 启动新线程。
@Bean
public Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec() {
final ExecutorService executorService = Executors.newCachedThreadPool();
return channels -> channels.executor(executorService);
}
@Bean
public Consumer<HeaderEnricherSpec> errorChannelSpec(MessageChannel genericExceptionChannel) {
return h -> h.header(MessageHeaders.ERROR_CHANNEL, genericExceptionChannel);
}
@Bean
public IntegrationFlow jmsMessageFlow(
@Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory,
Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec)
{
return IntegrationFlow.from(
Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("INCOMING_QUEUE")
.configureListenerContainer(
jmsListenerContainerSpec.andThen(spec -> spec.id("ListenerContainer")))
.errorChannel(genericExceptionChannel)
.outputChannel("messageHandlingChannel"))
// save message in db
.handle(
(payload, headers) -> databaseService.save(payload),
spec -> spec.advice(messageRetryAdvice).id("persistClientMessage"))
// new thread so that the jms message is acknowledged
.channel(jmsTxCommitingChannelSpec)
.enrichHeaders(errorChannelSpec)
.handle(
(payload, headers) -> messageParser.extractMessageMetadata(payload),
spec -> spec.id("extractMessageMetadata"))
.route(incomingMessageRouter)
.get();
}
我不确定在这种情况下如何关闭执行器服务?或者是否应该关闭它?
考虑使用
ThreadPoolTaskExecutor
代替 Executors.newCachedThreadPool()
并作为 bean。当 Spring 应用程序被叫停时,它有适当的生命周期管理。