我们使用 Spring Integration 6.3.3 聚合器模式,带有 JDBCMessageStore 和过期超时。我们还希望在到期时发送部分组,我们将其视为错误条件。我们希望在发生此错误时记录/发出警报。
IntegrationFlow
中的下游组件在收到聚合消息时了解消息是完整组还是部分消息的最佳方式是什么?
这是一个简化的示例:
@Autowired
PetGroupHandler petGroupHandler;
@Bean
public JdbcMessageStore jdbcMessageStore(
@Qualifier("mydbDataSource") DataSource dataSource) {
JdbcMessageStore messageStore = new JdbcMessageStore(dataSource);
messageStore.setRegion("petstore-pubsub");
return messageStore;
}
@Bean
IntegrationFlow petStoreSubscriptionFlow(JdbcMessageStore jdbcMessageStore, PetOutputProcessor petOutputProcessor) {
return IntegrationFlow.from("petStoreSubscriptionMessageChannel")
.filter(petOfInterestFilter, "shouldProcess")
.aggregate(aggregatorSpec -> aggregatorSpec
.messageStore(jdbcMessageStore)
.outputProcessor(petOutputProcessor)
.expireGroupsUponCompletion(true)
.groupTimeout(300 * 1000) // 5 minutes
.sendPartialResultOnExpiry(true) // send partial group
.correlationStrategy(message -> ((Pet) message.getPayload()).getBreed())
.releaseStrategy(group -> group.size()>=2))
.handle(petGroupHandler, "handle")
.get();
当上面示例中的
PetGroupHandler
收到 Pet
列表时,有没有办法知道 Pet
列表是完整组还是部分组,以便处理程序可以针对完整组和部分组执行不同的操作?
我们尝试实现一个
OutputProcessor
,它注入一个布尔标头,指示该组是否完整。但这不起作用,因为看起来该组没有在 AbstractCorrelatingMessageHandler.java
中完成,即在调用发布策略之后和在 OutputProcessor
方法中调用 completeGroup()
之前。
if (this.releaseStrategy.canRelease(messageGroup)) {
Collection<Message<?>> completedMessages = null;
try {
noOutput = false;
completedMessages = completeGroup(message, correlationKey, messageGroup, lock);
}
finally {
// Possible clean (implementation dependency) up
// even if there was an exception processing messages
afterRelease(messageGroup, completedMessages);
}
if (!isExpireGroupsUponCompletion() && this.minimumTimeoutForEmptyGroups > 0) {
removeEmptyGroupAfterTimeout(groupIdUuid, this.minimumTimeoutForEmptyGroups);
}
}
在上面的例子中,发布策略非常简单。你可能会说我可以只检查列表大小。但在我们的实际应用中,发布策略是比较复杂的。我希望避免我们的处理程序了解有关发布策略的任何信息。
我有点希望
.sendPartialResultOnExpiry()
有一个通道或处理程序参数,部分组将发送到该通道或处理程序参数。但没有办法指定这一点。
任何有关如何执行此操作的建议都将受到赞赏!谢谢!
再进一步
AbstractCorrelatingMessageHandler
有一个逻辑:
protected void expireGroup(Object correlationKey, MessageGroup group, Lock lock) {
this.logger.debug(() -> "Expiring MessageGroup with correlationKey[" + correlationKey + "]");
if (this.sendPartialResultOnExpiry) {
this.logger.debug(() -> "Prematurely releasing partially complete group with key ["
+ correlationKey + "] to: " + getOutputChannel());
completeGroup(correlationKey, group, lock);
}
else {
this.logger.debug(() -> "Discarding messages of partially complete group with key ["
+ correlationKey + "] to: "
+ (this.discardChannelName != null ? this.discardChannelName : this.discardChannel));
if (this.releaseLockBeforeSend) {
lock.unlock();
}
group.getMessages()
.forEach(this::discardMessage);
}
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(
new MessageGroupExpiredEvent(this, correlationKey, group.size(),
new Date(group.getLastModified()), new Date(), !this.sendPartialResultOnExpiry));
}
}
因此,如果
sendPartialResultOnExpiry(false)
(默认),部分完整组中的所有消息都将被丢弃。
有关更多信息,请参阅文档:https://docs.spring.io/spring-integration/reference/aggregator.html#aggregator-xml。
注意第 8 点:
指示一旦包含的 MessageGroup 过期,应聚合过期消息并将其发送到“输出通道”或“replyChannel”(请参阅 MessageGroupStore.expireMessageGroups(long))。使 MessageGroup 过期的一种方法是配置 MessageGroupStoreReaper。但是,您也可以通过调用 MessageGroupStore.expireMessageGroups(timeout) 来使 MessageGroup 过期。您可以通过控制总线操作来完成此操作,或者如果您有对 MessageGroupStore 实例的引用,则可以通过调用 expireMessageGroups(timeout) 来完成此操作。否则,该属性本身不会执行任何操作。它仅用作是否丢弃或发送到输出或回复通道任何仍位于 MessageGroup 中即将过期的消息的指示符。可选(默认为 false)。注意:此属性可能更合适地称为 send-partial-result-on-timeout,因为如果 expire-groups-upon-timeout 设置为 false,则该组实际上可能不会过期。