如何并行消费多条消息并检测所有执行何时完成?

问题描述 投票:0回答:2

我想发送多条消息,这些消息将异步遍历同一路由,并能够知道所有处理何时完成。

由于我需要知道每条路线何时终止,所以我考虑使用

ProducerTemplate#asyncRequestBody
,使用 InOut 模式,以便在返回的
get
 对象上调用 
Future
将阻塞,直到路由终止。

到目前为止一切顺利,每个请求都异步发送到路由,并且循环所有 Future 调用 get 方法将阻塞,直到所有请求都被阻塞。 我的路线已经完成。

问题是,虽然请求是异步发送的,但我希望它们也能并行使用。

例如,考虑 P 是

ProducerTemplate
,Rn 发出请求,En 是端点 - 我想要的是:

  ->   R0 -> from(E1).to(E2).to(E3) : done.
 /
P ->   R1 -> from(E1).to(E2).to(E3) : done.
 \  
  ->   R2 -> from(E1).to(E2).to(E3) : done.

        ^__ Requests consumed in parallel.

经过一些研究,我偶然发现了竞争消费者,它并行执行,添加更多消费者。

但是,由于同时有多个执行,这会减慢每个路由的执行速度,从而导致一些

ExchangeTimedOutException
:

The OUT message was not received within: 20000 millis due reply message with correlationID...

不足为奇,因为我正在发送一个 InOut 请求。但实际上,我并不关心回应,我只是用它来知道 当我的路线终止时。我会使用 InOnly (

ProducerTemplate#asyncSendBody
),但调用
Future#get
不会阻塞,直到 整个任务完成。

是否有另一种方法可以异步发送请求并检测请求何时全部完成?

请注意,在我的情况下,更改超时不是一个选项。

java apache-camel
2个回答
2
投票

我的第一直觉是建议使用 NotifyBuilder 来跟踪处理,更具体地说,使用

whenBodiesDone
来定位特定主体。

编辑:

这是一个简单的实现,但它确实证明了一点:

@SpringBootApplication
public class DemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
  }

  @Component
  public static class ParallelProcessingRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
      from("seda:test?concurrentConsumers=5")
          .routeId("parallel")
          .log("Received ${body}, processing")
          .delay(5000)
          .log("Processed ${body}")
          .stop();

      from("timer:testStarter?delay=3000&period=300000")
          .routeId("test timer")
          .process(exchange -> {
            // messages we want to track
            List<Integer> toSend = IntStream.range(0, 5).boxed().collect(toList());

            NotifyBuilder builder = new NotifyBuilder(getContext())
                .fromRoute("parallel")
                .filter(e -> toSend.contains(e.getIn().getBody(Integer.class)))
                .whenDone(toSend.size())
                .create();

            ProducerTemplate template = getContext().createProducerTemplate();

            // messages we do not want to track
            IntStream.range(10, 15)
                .forEach(body -> template.sendBody("seda:test", body)); 

            toSend.forEach(body -> template.sendBody("seda:test", body)); 

            exchange.getIn().setBody(builder.matches(1, TimeUnit.MINUTES));
          })
          .log("Matched? ${body}");
    }
  }
}

这是日志示例:

2016-08-06 11:45:03.861  INFO 27410 --- [1 - seda://test] parallel                                 : Received 10, processing
2016-08-06 11:45:03.861  INFO 27410 --- [5 - seda://test] parallel                                 : Received 11, processing
2016-08-06 11:45:03.864  INFO 27410 --- [2 - seda://test] parallel                                 : Received 12, processing
2016-08-06 11:45:03.865  INFO 27410 --- [4 - seda://test] parallel                                 : Received 13, processing
2016-08-06 11:45:03.866  INFO 27410 --- [3 - seda://test] parallel                                 : Received 14, processing
2016-08-06 11:45:08.867  INFO 27410 --- [1 - seda://test] parallel                                 : Processed 10
2016-08-06 11:45:08.867  INFO 27410 --- [3 - seda://test] parallel                                 : Processed 14
2016-08-06 11:45:08.867  INFO 27410 --- [4 - seda://test] parallel                                 : Processed 13
2016-08-06 11:45:08.868  INFO 27410 --- [2 - seda://test] parallel                                 : Processed 12
2016-08-06 11:45:08.868  INFO 27410 --- [5 - seda://test] parallel                                 : Processed 11
2016-08-06 11:45:08.870  INFO 27410 --- [1 - seda://test] parallel                                 : Received 0, processing
2016-08-06 11:45:08.872  INFO 27410 --- [4 - seda://test] parallel                                 : Received 2, processing
2016-08-06 11:45:08.872  INFO 27410 --- [3 - seda://test] parallel                                 : Received 1, processing
2016-08-06 11:45:08.872  INFO 27410 --- [2 - seda://test] parallel                                 : Received 3, processing
2016-08-06 11:45:08.872  INFO 27410 --- [5 - seda://test] parallel                                 : Received 4, processing
2016-08-06 11:45:13.876  INFO 27410 --- [1 - seda://test] parallel                                 : Processed 0
2016-08-06 11:45:13.876  INFO 27410 --- [3 - seda://test] parallel                                 : Processed 1
2016-08-06 11:45:13.876  INFO 27410 --- [4 - seda://test] parallel                                 : Processed 2
2016-08-06 11:45:13.876  INFO 27410 --- [5 - seda://test] parallel                                 : Processed 4
2016-08-06 11:45:13.876  INFO 27410 --- [2 - seda://test] parallel                                 : Processed 3
2016-08-06 11:45:13.877  INFO 27410 --- [r://testStarter] test timer                               : Matched? true

您会注意到

NotifyBuilder
在结果匹配后立即返回结果。


-1
投票

如果您知道您正在使用的每批消息中都有 X 条消息,您可以在并行处理结束时使用聚合器。对于您的示例,每组消息都将具有自己唯一的标头标签,聚合器将拾取该标头标签。在处理所有消息并且所有消息都最终到达聚合器后,您可以将消息聚合为您想要的任何格式并返回它们。

© www.soinside.com 2019 - 2024. All rights reserved.