我正在实现以下一个Spring Integration工作流程。
IntegrationFlows.from("inputFileProcessorChannel")
.split(fileSplitterSpec, spec -> {})
.transform(lineItemTransformer)
.handle(httpRequestExecutingMessageHandler)
.transform(reportDataAggregator)
.aggregate(aggregatorSpec -> aggregatorSpec.requiresReply(false))
.channel("reportGeneratorChannel")
.get();
现在,完成上述流程后,我需要将input file
移至存档目录。决定目标目录的决定基于消息头processingFailed
,并且在流程的.transform(reportDataAggregator)
步骤中添加此头。要移动此文件,我创建了另一个流程,如下面的代码所示
IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel"))
.routeToRecipients(routerSpec -> {
routerSpec.recipient("processedFileMoverChannel", createMessageSelector(Boolean.FALSE))
.recipient("failedFileMoverChannel", createMessageSelector(Boolean.TRUE));
})
.get();
选择器方法
private MessageSelector createMessageSelector(Boolean ruleBoolean) {
return message -> ruleBoolean.equals(message.getHeaders().get("processingFailed"));
}
下面的报告频道流
IntegrationFlows.from("reportGeneratorChannel")
.transform(executionReportTransformer)
.handle(reportWritingMessageHandlerSpec)
.get();
但是,正如该流程所预期的那样,由于在流执行中不存在所述头,因此文件移动未完成。
因此,如何在创建报告文件后实现执行file mover flow
的目标?
FileSplitter
为我们填充要产生的每一行的标题:
@Override
protected boolean willAddHeaders(Message<?> message) {
Object payload = message.getPayload();
return payload instanceof File || payload instanceof String;
}
@Override
protected void addHeaders(Message<?> message, Map<String, Object> headers) {
File file = null;
if (message.getPayload() instanceof File) {
file = (File) message.getPayload();
}
else if (message.getPayload() instanceof String) {
file = new File((String) message.getPayload());
}
if (file != null) {
if (!headers.containsKey(FileHeaders.ORIGINAL_FILE)) {
headers.put(FileHeaders.ORIGINAL_FILE, file);
}
if (!headers.containsKey(FileHeaders.FILENAME)) {
headers.put(FileHeaders.FILENAME, file.getName());
}
}
}
因此,即使您已完成聚合并准备向该.channel("reportGeneratorChannel")
发送消息,您仍然可以访问那些与文件相关的标头。
将此reportGeneratorChannel
设置为PublishSubscribeChannel
,然后将“文件移动器流”移到那里,将为您解决问题。
顺便说一句:到目前为止,在同一个通道上使用IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel"))
和第二个流时,您将进行循环调度。那不是发布订阅分发。在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel