IntegrationFlow.from(buildJdbcMessageSource(),
c -> c.poller(Pollers.fixedDelay(100).maxMessagesPerPoll(1)))
List<>
有效载荷的消息,然后是
.split()
。 我需要并行处理大量消息。但是,每个消息属于特定组,由ID识别,并具有序列号。虽然可以同时处理来自不同组的消息,但必须严格维护每个组中的订单。下一个民意调查可能包含具有同一组ID的新消息,但是如果仍在处理同一组的消息,则不应处理这些消息。
组ID的数量可能会有所不同,并且可配置,但通常会大约有30组。I使用了A
PartitionChannel
,并且成功地维护了每个组中的序列顺序。但是,它不会调节轮询速率,这可能比实际消息流的速度要快得多。结果,数据库中积累了大量消息,其状态为“集成”(由JdbcPollingChannelAdapter
updateQuery
)。
我不知道PartitionChannel
是否是最佳方法。我想到的解决方案:
以特定数量的消息的方式阻止
PartitionChannel
;
突出了民意调查并使用
PartitionChannel
;在启动上以
IntegrationFlowContext
.aggregate
我使用Spring Boot3.3.4.
PartitionChannel
您说这有点“过度播放”,而且根据您的配置,这确实不足为奇,您需要研究此时可以做些什么。对于您来说,较长
fixedDelay(100)
条件投票
在运行时调整触发器。或者,您可以简单地决定停止该端点的端点,然后在当前工作负载或背压时重新启动它。