为什么Spring Integration的TimeoutCountSequenceSizeReleaseStrategy在用户设置的时间过去后不释放消息?

问题描述 投票:0回答:1

在我的 SpringBoot 应用程序中,我编写了一个聚合器,如下所示。我希望在 2 分钟后立即发布消息。但是下面的代码似乎不起作用。 但是,当发布策略从 "timoutReleaseStrategy' 更改为 "size() ==1" 时,消息会一条一条地发布。但是,我希望所有消息在 2 分钟后发布。

我用的spring版本是5.3.20

<int:aggregator 
      output-channel="splitterInChannel"
      send-partial-result-on-expiry="true"
      message-store="customMessageStore"
      correlation-strategy-expression="headers['type']
      ref="taskAggregationService"
      method="aggregateTask"
      release-strategy-expression="timeoutReleaseStrategy"    
      expire-groups-upon-completion="true"
      expire-groups-upon-timeout="true"
      id="customAggregator"
      input-channel="aggregatorInChannel" >
</int:aggregator>

<bean id="customMessageStore"  class="org....SimpleMessageStore"/>
<bean id="timeoutReleaseStrategy" class="org....TimeoutCountSequenceSizeReleaseStrategy">
   <constructor-arg name="threshold" value="100"/>
      <constructor-arg name="timeout" value="120000"/> // 2 minutes
</bean>
java spring spring-boot spring-integration
1个回答
0
投票

根据 Spring 文档,如果要覆盖聚合器的默认发布策略,则需要提供自定义 ReleaseStrategy 实现。默认释放策略仅在序列中包含的所有消息都存在时才释放一个组。

如果你想根据超时事件发布消息,你还需要一个单独的 Reaper 组件。聚合器本身是一个被动组件,只在收到消息后根据发布策略的结果来决定是否发送消息。

您可以在此链接https://docs.spring.io/spring-integration/docs/2.0.0.RC1/reference/html/aggregator.html 中找到有关聚合 API 以及如何自定义它的更多详细信息。

您可以使用 lambda 表达式来定义基于某些条件的发布策略。例如,如果你想在 FileSplitter.FileMarker.Mark.END 到达时释放一个组,你可以使用这个代码:

.releaseStrategy(group -> group.getMessages()
    .stream()
    .anyMatch(m -> FileSplitter.FileMarker.Mark.END.name()
        .equals(m.getHeaders().get(FileHeaders.MARKER))))

也可以参考Spring文档

另一个选项是使用 groupTimeout 或 groupTimeoutExpression 属性来指定在没有足够消息的情况下释放组的时间限制 Spring Integration聚合器基于最后修改的发布策略

© www.soinside.com 2019 - 2024. All rights reserved.