我的 Apache Camel 路线中有以下代码行:
.split().method(SomeDataCacheSplitter.class, "split").streaming()
.marshal(gsonDataFormat)
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.executorService(new SynchronousExecutorService())
.completionSize(1000)
.completionPredicate(someDataCache.getEndOfFilePredicate())
.to("elasticsearch://elasticsearch?operation=Bulk&hostAddresses=#elasticsearchHostAddressess")
.end()
.end()
当使用 Java 11 和 Camel 3.17.0 执行此代码时,我可以在日志中看到所有内容都在单个线程中执行:
2025-01-13 13:23:16.788 DEBUG 31550 --- [/O dispatcher 1] org.apache.http.wire ...
2025-01-13 13:23:16.788 DEBUG 31550 --- [/O dispatcher 1] org.apache.http.wire ...
2025-01-13 13:23:16.788 DEBUG 31550 --- [/O dispatcher 1] org.apache.http.wire ...
但是,当使用 Java 21 和 Camel 4.8.2 执行相同的代码时,它会以 10 个线程并行执行:
2025-01-13T13:46:38.956+01:00 DEBUG 46176 --- [ient-0-thread-9] org.apache.http.wire ...
2025-01-13T13:46:38.956+01:00 DEBUG 46176 --- [ient-0-thread-7] org.apache.http.wire ...
2025-01-13T13:46:38.956+01:00 DEBUG 46176 --- [ient-0-thread-6] org.apache.http.wire ...
问题
您知道为什么会出现这种情况以及如何修复它以便所有内容仅在单个线程中运行吗?
总的来说,该代码旨在将数据发送到 Elasticsearch 并在批量上传时创建索引。之后,路由中有一个 .process 步骤,它将新索引分配给别名。如果没有同步执行,我会得到一个 index_not_found_Exception ,原因是“没有这样的索引”,因为索引还不存在,最坏的情况是别名指向损坏的索引,以防以后批量上传到弹性时出现错误。
任何确保 Camel 4 和 Java 21 中单线程执行的见解或解决方案将不胜感激。谢谢!
在 Camel 3 中,如果使用
.executorService(new SynchronousExecutorService())
,聚合逻辑会遵循显式提供的禁用线程的执行器服务。
在 Camel 4 中
Aggregate
如果没有显式设置正确配置的执行器服务,EIP(企业集成模式)会将并发委托给默认线程池执行器。即使您通过了SynchronousExecutorService
,由于 Camel 4 中的内部行为变化,它也可能会被忽略或区别对待。
Camel 4 使用各种优化来提高管道并发性,影响 split、aggregate 和 marshal 等组件。如果存在异步下游操作(例如发送到
elasticsearch
),它们可能会导致并行性 - 即使没有明确意图。
您可以尝试通过显式设置
.parallelProcessing(false)
来强制拆分器执行单线程执行
.split().method(SomeDataCacheSplitter.class, "split")
.streaming()
.parallelProcessing(false) // Ensure only a single thread processes the split
.marshal(gsonDataFormat)
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.executorService(new SynchronousExecutorService())
.completionSize(1000)
.completionPredicate(someDataCache.getEndOfFilePredicate())
.to("elasticsearch://elasticsearch?operation=Bulk&hostAddresses=#elasticsearchHostAddressess")
.end()
.end()