错误信息。原因是:java.lang.IllegalStateException: Null correlation not allowed. 也许是CorrelationStrategy失败了?
我的实现。
@Bean
public IntegrationFlow start() {
return IntegrationFlows
.from("getOrders")
.split()
.publishSubscribeChannel(c -> c.subscribe(s -> s.channel(q -> q.queue(1))
.<Order, Message<?>>transform(p -> MessageBuilder.withPayload(new Item(p.getItems())).setHeader(ORDERID, p.getOrderId()).build())
.split(Item.class, Item::getItems)
.transform() // let's assume, an object created for each item, let's say ItemProperty to the object.
// Transform returns message; MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build();
.aggregate() // so, here aggregate method needs to aggregate ItemProperties.
.handle() // handler gets List<ItemProperty> as an input.
))
.get();
}
两个分路器都能正常工作。我也测试过第二台分路器后的变压器,工作正常。但是,到了汇总的时候,却出现了故障。我这里缺什么?
你忽略了这样一个事实 transformer
是这种类型的端点,它可以原封不动地处理整个消息。如果你自己创建了一个消息,它不会修改它。MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build();
你只是在分割器之后错过了重要的序列细节头。因此,之后的聚合器不知道该怎么处理你的消息,因为你为默认的相关策略配置了它,但你没有在消息中提供各自的头信息。
从技术上讲,我看不出有什么理由在那边手动创建一个消息:简单的 return createItemProperty(getItemName, getItemId);
对你来说应该足够了。而框架会代你去创建消息,并复制各自的请求消息头。
如果你真的还认为你需要自己在那个变换中创建一个消息,那么你需要考虑到的是 copyHeaders()
关于 MessageBuilder
从请求报文中携带所需的序列细节头。