在我的 SpringBoot 应用程序中,我编写了一个聚合器,如下所示。我希望在 2 分钟后立即发布消息。但是下面的代码似乎不起作用。 但是,当发布策略从 "timoutReleaseStrategy' 更改为 "size() ==1" 时,消息会一条一条地发布。但是,我希望所有消息在 2 分钟后发布。
我用的spring版本是5.3.20
<int:aggregator
output-channel="splitterInChannel"
send-partial-result-on-expiry="true"
message-store="customMessageStore"
correlation-strategy-expression="headers['type']
ref="taskAggregationService"
method="aggregateTask"
release-strategy-expression="timeoutReleaseStrategy"
expire-groups-upon-completion="true"
expire-groups-upon-timeout="true"
id="customAggregator"
input-channel="aggregatorInChannel" >
</int:aggregator>
<bean id="customMessageStore" class="org....SimpleMessageStore"/>
<bean id="timeoutReleaseStrategy" class="org....TimeoutCountSequenceSizeReleaseStrategy">
<constructor-arg name="threshold" value="100"/>
<constructor-arg name="timeout" value="120000"/> // 2 minutes
</bean>
根据 Spring 文档,如果要覆盖聚合器的默认发布策略,则需要提供自定义 ReleaseStrategy 实现。默认释放策略仅在序列中包含的所有消息都存在时才释放一个组。
如果你想根据超时事件发布消息,你还需要一个单独的 Reaper 组件。聚合器本身是一个被动组件,只在收到消息后根据发布策略的结果来决定是否发送消息。
您可以在此链接https://docs.spring.io/spring-integration/docs/2.0.0.RC1/reference/html/aggregator.html 中找到有关聚合 API 以及如何自定义它的更多详细信息。
您可以使用 lambda 表达式来定义基于某些条件的发布策略。例如,如果你想在 FileSplitter.FileMarker.Mark.END 到达时释放一个组,你可以使用这个代码:
.releaseStrategy(group -> group.getMessages()
.stream()
.anyMatch(m -> FileSplitter.FileMarker.Mark.END.name()
.equals(m.getHeaders().get(FileHeaders.MARKER))))
也可以参考Spring文档
另一个选项是使用 groupTimeout 或 groupTimeoutExpression 属性来指定在没有足够消息的情况下释放组的时间限制 Spring Integration聚合器基于最后修改的发布策略