我有一个春季整合流,与JDBCPollingChannelAdapter进行了轮询,因此: IntegrationFlow.from(buildjdbcmessagesource(), C-> C.Poller(Pol ...

问题描述 投票:0回答:0

IntegrationFlow.from(buildJdbcMessageSource(), c -> c.poller(Pollers.fixedDelay(100).maxMessagesPerPoll(1)))

结果带有

List<>
有效载荷的消息,然后是
.split()

。 我需要并行处理大量消息。但是,每个消息属于特定组,由ID识别,并具有序列号。虽然可以同时处理来自不同组的消息,但必须严格维护每个组中的订单。下一个民意调查可能包含具有同一组ID的新消息,但是如果仍在处理同一组的消息,则不应处理这些消息。

组ID的数量可能会有所不同,并且可配置,但通常会大约有30组。
I使用了A
PartitionChannel
,并且成功地维护了每个组中的序列顺序。但是,它不会调节轮询速率,这可能比实际消息流的速度要快得多。结果,数据库中积累了大量消息,其状态为“集成”(由

JdbcPollingChannelAdapter

updateQuery

)。

我不知道

PartitionChannel

是否是最佳方法。
我想到的解决方案:

以特定数量的消息的方式阻止
PartitionChannel

;

突出了民意调查并使用
PartitionChannel

;

在启动上以
    IntegrationFlowContext
  • 的方式在启动上创建流量,并根据组ID将消息路由到每个流程;
    使用
  • .aggregate
  • -但不确定它将保证在具有相同组ID的消息的民意调查中订单。
    我使用Spring Boot3.3.4.
  • 在确保在同一消息组ID中维护订单时并行处理消息的好解决方案是什么?
  • yes,
  • PartitionChannel
  • 完全是为这种任务而设计的:平行发射,但如果分区相同,则是顺序的。本质上,具有同一分区的消息在同一线程中处理。但是,是的,由于该频道将其工作转移到其他线程,因此原始的线程(在我们的情况下,民意测验器)可以自由地做任何其他事情。
    您说这有点“过度播放”,而且根据您的配置,这确实不足为奇,您需要研究此时可以做些什么。对于您来说,较长
    fixedDelay(100)
  • 可能是一个简单的技巧。
无论您如何查看

条件投票

在运行时调整触发器。或者,您可以简单地决定停止该端点的端点,然后在当前工作负载或背压时重新启动它。

spring-integration
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.