我想发送多条消息,这些消息将异步遍历同一路由,并能够知道所有处理何时完成。
ProducerTemplate#asyncRequestBody
,使用 InOut 模式,以便在返回的 get
对象上调用
Future
将阻塞,直到路由终止。
到目前为止一切顺利,每个请求都异步发送到路由,并且循环所有 Future 调用 get 方法将阻塞,直到所有请求都被阻塞。 我的路线已经完成。
问题是,虽然请求是异步发送的,但我希望它们也能并行使用。
例如,考虑 P 是
ProducerTemplate
,Rn 发出请求,En 是端点 - 我想要的是:
-> R0 -> from(E1).to(E2).to(E3) : done.
/
P -> R1 -> from(E1).to(E2).to(E3) : done.
\
-> R2 -> from(E1).to(E2).to(E3) : done.
^__ Requests consumed in parallel.
经过一些研究,我偶然发现了竞争消费者,它并行执行,添加更多消费者。
但是,由于同时有多个执行,这会减慢每个路由的执行速度,从而导致一些
ExchangeTimedOutException
:
The OUT message was not received within: 20000 millis due reply message with correlationID...
不足为奇,因为我正在发送一个 InOut 请求。但实际上,我并不关心回应,我只是用它来知道 当我的路线终止时。我会使用 InOnly (
ProducerTemplate#asyncSendBody
),但调用 Future#get
不会阻塞,直到
整个任务完成。
是否有另一种方法可以异步发送请求并检测请求何时全部完成?
请注意,在我的情况下,更改超时不是一个选项。
我的第一直觉是建议使用 NotifyBuilder 来跟踪处理,更具体地说,使用
whenBodiesDone
来定位特定主体。
编辑:
这是一个简单的实现,但它确实证明了一点:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Component
public static class ParallelProcessingRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("seda:test?concurrentConsumers=5")
.routeId("parallel")
.log("Received ${body}, processing")
.delay(5000)
.log("Processed ${body}")
.stop();
from("timer:testStarter?delay=3000&period=300000")
.routeId("test timer")
.process(exchange -> {
// messages we want to track
List<Integer> toSend = IntStream.range(0, 5).boxed().collect(toList());
NotifyBuilder builder = new NotifyBuilder(getContext())
.fromRoute("parallel")
.filter(e -> toSend.contains(e.getIn().getBody(Integer.class)))
.whenDone(toSend.size())
.create();
ProducerTemplate template = getContext().createProducerTemplate();
// messages we do not want to track
IntStream.range(10, 15)
.forEach(body -> template.sendBody("seda:test", body));
toSend.forEach(body -> template.sendBody("seda:test", body));
exchange.getIn().setBody(builder.matches(1, TimeUnit.MINUTES));
})
.log("Matched? ${body}");
}
}
}
这是日志示例:
2016-08-06 11:45:03.861 INFO 27410 --- [1 - seda://test] parallel : Received 10, processing
2016-08-06 11:45:03.861 INFO 27410 --- [5 - seda://test] parallel : Received 11, processing
2016-08-06 11:45:03.864 INFO 27410 --- [2 - seda://test] parallel : Received 12, processing
2016-08-06 11:45:03.865 INFO 27410 --- [4 - seda://test] parallel : Received 13, processing
2016-08-06 11:45:03.866 INFO 27410 --- [3 - seda://test] parallel : Received 14, processing
2016-08-06 11:45:08.867 INFO 27410 --- [1 - seda://test] parallel : Processed 10
2016-08-06 11:45:08.867 INFO 27410 --- [3 - seda://test] parallel : Processed 14
2016-08-06 11:45:08.867 INFO 27410 --- [4 - seda://test] parallel : Processed 13
2016-08-06 11:45:08.868 INFO 27410 --- [2 - seda://test] parallel : Processed 12
2016-08-06 11:45:08.868 INFO 27410 --- [5 - seda://test] parallel : Processed 11
2016-08-06 11:45:08.870 INFO 27410 --- [1 - seda://test] parallel : Received 0, processing
2016-08-06 11:45:08.872 INFO 27410 --- [4 - seda://test] parallel : Received 2, processing
2016-08-06 11:45:08.872 INFO 27410 --- [3 - seda://test] parallel : Received 1, processing
2016-08-06 11:45:08.872 INFO 27410 --- [2 - seda://test] parallel : Received 3, processing
2016-08-06 11:45:08.872 INFO 27410 --- [5 - seda://test] parallel : Received 4, processing
2016-08-06 11:45:13.876 INFO 27410 --- [1 - seda://test] parallel : Processed 0
2016-08-06 11:45:13.876 INFO 27410 --- [3 - seda://test] parallel : Processed 1
2016-08-06 11:45:13.876 INFO 27410 --- [4 - seda://test] parallel : Processed 2
2016-08-06 11:45:13.876 INFO 27410 --- [5 - seda://test] parallel : Processed 4
2016-08-06 11:45:13.876 INFO 27410 --- [2 - seda://test] parallel : Processed 3
2016-08-06 11:45:13.877 INFO 27410 --- [r://testStarter] test timer : Matched? true
您会注意到
NotifyBuilder
在结果匹配后立即返回结果。
如果您知道您正在使用的每批消息中都有 X 条消息,您可以在并行处理结束时使用聚合器。对于您的示例,每组消息都将具有自己唯一的标头标签,聚合器将拾取该标头标签。在处理所有消息并且所有消息都最终到达聚合器后,您可以将消息聚合为您想要的任何格式并返回它们。