在弹簧积分流中,使用SynctasKeexecutor与使用单个螺纹流相同?

问题描述 投票:0回答:1
我们想提交JMS根据流程中间的属性读取交易。我们的实施目前使用执行人实现这一目标。

@Bean public Consumer<JmsDefaultListenerContainerSpec> jmsListenerContainerSpec() { return containerSpec -> { containerSpec.receiveTimeout(20_000L); containerSpec.maxConcurrentConsumers(1); containerSpec.sessionTransacted(true); }; } @Bean public Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec( @Qualifier("jmsTaskExecutor") Executor jmsTaskExecutor) { return channels -> channels.executor(jmsTaskExecutor); } @Bean(name = "jmsTaskExecutor") @ConditionalOnProperty(value = "app.commit-jms-reads-early", havingValue = "true") public Executor jmsTaskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); // setting this to 0 mimics the behavior of Executors.newCachedThreadPool(); // where no tasks are queued and a new thread is created as needed if none // are available in the cache taskExecutor.setQueueCapacity(0); return taskExecutor; } @Bean(name = "jmsTaskExecutor") @ConditionalOnProperty(value = "app.commit-jms-reads-early", havingValue = "false", matchIfMissing = true) public Executor synchronousJmsTaskExecutor() { SyncTaskExecutor taskExecutor = new SyncTaskExecutor(); return taskExecutor; } @Bean public Consumer<HeaderEnricherSpec> errorChannelSpec(MessageChannel genericExceptionChannel) { return h -> h.header(MessageHeaders.ERROR_CHANNEL, genericExceptionChannel); } @Bean public IntegrationFlow jmsMessageFlow( @Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory, Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec) { return IntegrationFlow.from( Jms.messageDrivenChannelAdapter(connectionFactory) .destination("INCOMING_QUEUE") .configureListenerContainer( jmsListenerContainerSpec.andThen(spec -> spec.id("ListenerContainer"))) .errorChannel(genericExceptionChannel) .outputChannel("messageHandlingChannel")) // save message in db .handle( (payload, headers) -> databaseService.save(payload), spec -> spec.advice(messageRetryAdvice).id("persistClientMessage")) // new thread so that the jms message is acknowledged .channel(jmsTxCommitingChannelSpec) .enrichHeaders(errorChannelSpec) .handle( (payload, headers) -> messageParser.extractMessageMetadata(payload), spec -> spec.id("extractMessageMetadata")) .handle( (payload, headers) -> databaseService.update(payload)) .handle(Jms.outboundAdapter(connectionFactory) .destination(getQueueName()) .configureJmsTemplate(jmsTemplateSpec -> jmsTemplateSpec.id("jmsTemplate"))) .get(); }

每当未设置属性时,请在此处使用
SyncTaskExecutor

从spring文档
Synctaskexecutor:此实现不会异步运行调用。相反,每个调用发生在调用线程中。它主要用于不需要多线程的情况,例如在简单的测试用例中。
    

您是对的。对于您的用例,确实存在一个问题。 看到其实施:

app.commit-jms-reads-early

不像
spring spring-integration
1个回答
0
投票

@Override
public void execute(Runnable task) {
    Assert.notNull(task, "Runnable must not be null");
    task.run();
}

,因此,使用此逻辑的问题,您的交易不会进行,因为您只是不会离开JMS线程并完全执行其余的流程。


最新问题
© www.soinside.com 2019 - 2025. All rights reserved.