@Bean
public Consumer<JmsDefaultListenerContainerSpec> jmsListenerContainerSpec() {
return containerSpec -> {
containerSpec.receiveTimeout(20_000L);
containerSpec.maxConcurrentConsumers(1);
containerSpec.sessionTransacted(true);
};
}
@Bean
public Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec(
@Qualifier("jmsTaskExecutor") Executor jmsTaskExecutor) {
return channels -> channels.executor(jmsTaskExecutor);
}
@Bean(name = "jmsTaskExecutor")
@ConditionalOnProperty(value = "app.commit-jms-reads-early", havingValue = "true")
public Executor jmsTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// setting this to 0 mimics the behavior of Executors.newCachedThreadPool();
// where no tasks are queued and a new thread is created as needed if none
// are available in the cache
taskExecutor.setQueueCapacity(0);
return taskExecutor;
}
@Bean(name = "jmsTaskExecutor")
@ConditionalOnProperty(value = "app.commit-jms-reads-early", havingValue = "false", matchIfMissing = true)
public Executor synchronousJmsTaskExecutor() {
SyncTaskExecutor taskExecutor = new SyncTaskExecutor();
return taskExecutor;
}
@Bean
public Consumer<HeaderEnricherSpec> errorChannelSpec(MessageChannel genericExceptionChannel) {
return h -> h.header(MessageHeaders.ERROR_CHANNEL, genericExceptionChannel);
}
@Bean
public IntegrationFlow jmsMessageFlow(
@Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory,
Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec)
{
return IntegrationFlow.from(
Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("INCOMING_QUEUE")
.configureListenerContainer(
jmsListenerContainerSpec.andThen(spec -> spec.id("ListenerContainer")))
.errorChannel(genericExceptionChannel)
.outputChannel("messageHandlingChannel"))
// save message in db
.handle(
(payload, headers) -> databaseService.save(payload),
spec -> spec.advice(messageRetryAdvice).id("persistClientMessage"))
// new thread so that the jms message is acknowledged
.channel(jmsTxCommitingChannelSpec)
.enrichHeaders(errorChannelSpec)
.handle(
(payload, headers) -> messageParser.extractMessageMetadata(payload),
spec -> spec.id("extractMessageMetadata"))
.handle(
(payload, headers) -> databaseService.update(payload))
.handle(Jms.outboundAdapter(connectionFactory)
.destination(getQueueName())
.configureJmsTemplate(jmsTemplateSpec -> jmsTemplateSpec.id("jmsTemplate")))
.get();
}
每当未设置属性时,请在此处使用
SyncTaskExecutor
。
从spring文档Synctaskexecutor:此实现不会异步运行调用。相反,每个调用发生在调用线程中。它主要用于不需要多线程的情况,例如在简单的测试用例中。
您是对的。对于您的用例,确实存在一个问题。 看到其实施:
app.commit-jms-reads-early
不像
:
@Override
public void execute(Runnable task) {
Assert.notNull(task, "Runnable must not be null");
task.run();
}
,因此,使用此逻辑的问题,您的交易不会进行,因为您只是不会离开JMS线程并完全执行其余的流程。