基于我之前的问题,我创建了以下流程:
<int:channel id="output.buffer.channel"/>
<int:chain id="pollingBaseChain" input-channel="pollingInput">
<int:poller ref="pollingInputPoller"/>
<int:gateway id="inputChainGateway" request-channel="input.buffer.channel" reply-channel="output.buffer.channel"/>
<int:service-activator id="outboundRoutingService" ref="outboundMessageRouterService" method="forwardMessage"/>
</int:chain>
<int:chain id="input.chain" input-channel="input.buffer.channel" output-channel="output.buffer.channel">
<int:poller ref="inputPoller"/>
<!-- Various service activators/transfomers -->
<int:splitter id="messageSplitter" ref="messageSplitterSequence" apply-sequence="false"/>
<int:transformer id="outboundEntries" ref="routingService" method="prepareOutboundEntries"/>
</int:chain>
在服务激活器outboundRoutingService
内部发生的代码非常少:
public void forwardMessage(Message message, @Header(value = "nextChannel", required = false) MessageChannel channel) {
logger.info("Received message for routing. Channel is: {}, message is: {}", channel, message);
if(channel != null) {
channel.send(message);
}
}
现在我向流中发送25条消息,它们被pollingBaseChain
拾取,通过input.chain
转发到gateway
并在那里处理。在该链中,他们被分成5条消息,125条消息正在离开input.chain
。来自forwardMessage
的outboundRoutingService
记录了25条消息。我相信这是因为网关试图通过他们的ID来匹配消息,所以只有25条原始消息被拾取(以及其他消息会发生什么?)并被转发。
这是我的第一个问题,有没有办法,input.chain
发出的所有消息继续在pollingBaseChain
中流动并转发到outboundRoutingService
服务激活器?
我的第二个问题是,当有25条消息到达forwardMessage
并且非空的channel
时,只有1条消息被通道发送到forwardMessage
。这个频道是一个QueueChannel
,其队列大小超过25个。那些消息会丢失在哪里?
网关请求 - 回复基于TemporaryReplyChannel
头部的replyChannel
,它本质上是一个private final CountDownLatch replyLatch = new CountDownLatch(1);
。所以,它实际上是一个请求的一个回复。该网关不了解下游及其生产许多按摩的可能性。
为了兑现“一对一”合同,您需要考虑在发送到回复频道之前聚合所有这些消息。
在Docs:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-routing-chapter.html#aggregator中查看有关聚合器的更多信息
您可以考虑在网关之后将结果拆分回一堆消息。