我目前正在尝试更改一个使用 Spring 集成从 SFTP 服务器发送和接收文件的项目。
更改包括添加其他SFTP服务器,根据条件选择正确的服务器并以相同的方式处理文件。
我目前正在努力解决两件事。
第一个:
我有一个通道,其中消息包含带有远程目录的标头,并且我需要访问正确的 SFTP 会话。但为了进行会话,我需要正确的属性。 (config1或config2,在我的application.yml中定义) 我不确定如何将此信息传递给我的 ServiceActivator。 (我的代码中的第二个 TODO)
第二个:
我需要从多个 SFTP 服务器检索文件,并且需要将这些文件保存在多个本地目录中。远程和本地之间的路径不相同,并且在属性 config1 和 config2 中定义的方式与我在第一个问题中描述的方式相同。 我认为我以正确的方式委派 SFTP 会话,但我不知道如何根据 SFTP 会话设置 localDirectory。 (我的代码中的第一个 TODO)
如果有人可以帮助我一点,我将非常感激。
提前致谢。
这是迄今为止我的代码:
SftpConfig.Sftp1 sftpConfig1;
SftpConfig.Sftp2 sftpConfig2;
@Bean
@BridgeTo
public MessageChannel uploadChannel() {
return new PublishSubscribeChannel();
}
@Bean
public ExpressionParser spelExpressionParser() {
return new SpelExpressionParser();
}
public SessionFactory<ChannelSftp.LsEntry> getSftpSession(SftpConfig sftp) {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(sftp.getHost());
factory.setPort(sftp.getPort());
factory.setUser(sftp.getUser());
factory.setPassword(sftp.getPassword());
factory.setAllowUnknownKeys(true);
factory.setTimeout(sftp.getTimeout());
log.info("timeout is set to: {}", sftp.getTimeout());
return new CachingSessionFactory<>(factory);
}
@Bean
public DelegatingSessionFactory<ChannelSftp.LsEntry> delegatingSf() {
Map<Object, SessionFactory<ChannelSftp.LsEntry>> mapSession = new HashMap<>();
mapSession.put("config1", getSftpSession(sftpConfig1));
mapSession.put("config2", getSftpSession(sftpConfig2));
SessionFactoryLocator<ChannelSftp.LsEntry> sessionFactoryLocator = new DefaultSessionFactoryLocator<>(mapSession);
return new DelegatingSessionFactory<>(sessionFactoryLocator);
}
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = sftpConfig1.getCodes().stream()
.map(code -> new RotationPolicy.KeyDirectory("config1", sftpConfig1.getReaderDirectory() + SEPARATOR + code))
.collect(Collectors.toList());
keyDirectories.addAll(sftpConfig2.getCodes().stream()
.map(code -> new RotationPolicy.KeyDirectory("config2", sftpConfig2.getReaderDirectory() + SEPARATOR + code))
.collect(Collectors.toList()));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
@Bean
public IntegrationFlow sftpIntegrationFlow() {
return IntegrationFlows.from(
Sftp.inboundAdapter(delegatingSf())
.filter(new SftpSimplePatternFileListFilter("*.csv"))
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localFilter(new AbstractFileListFilter<File>() {
@Override
public boolean accept(final File file) {
return file.getName().endsWith(".csv");
}
})
.deleteRemoteFiles(false)
.temporaryFileSuffix(".new")
.localDirectory(new File()) // TODO dynamic local directory based on sftp session
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1, MINUTES).advice(advice()).advice(logNoFileFoundAdvice())))
.log(LoggingHandler.Level.INFO, "[SFTP]", m -> "Received file: " + m.getHeaders().get(FileHeaders.FILENAME))
.channel("filesReceptionChannel")
.enrichHeaders(h -> h.header("errorChannel", "errorChannel"))
.get();
}
@Bean
public MethodInterceptor logNoFileFoundAdvice() {
return invocation -> {
Object result = invocation.proceed();
if (result == null) {
log.info("[SFTP] No files found");
}
return result;
};
}
@Bean
public SftpRemoteFileTemplate sftpTemplate() {
return new SftpRemoteFileTemplate(sftpSession());
}
@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSession() {
return getSftpSession(); // TODO dynamic sftp session based on message received in serviceActivator bellow
}
@Bean
@ServiceActivator(inputChannel = "uploadChannel")
public MessageHandler uploadHandler() {
return getFtpMessageHandler(sftpSession());
}
public MessageHandler getFtpMessageHandler(SessionFactory<ChannelSftp.LsEntry> sftpSession) {
SftpMessageHandler handler = new SftpMessageHandler(sftpSession);
handler.setRemoteDirectoryExpressionString("headers['remoteDirectory']");
handler.setFileNameGenerator(message -> {
if (message.getPayload() instanceof File) {
return ((File) message.getPayload()).getName();
} else {
throw new IllegalArgumentException("File expected as payload.");
}
});
handler.setUseTemporaryFileName(false);
return handler;
}
无法在
localDirectory
上更改 AbstractInboundFileSynchronizingMessageSource
。您可以考虑使用 localFilenameExpression()
动态构建目录。虽然 localDirectory
可能只是根 - /
。
要发送到不同的 SFTP 服务器,您肯定必须使用
DelegatingSessionFactory
。请参阅文档中的更多信息:https://docs.spring.io/spring-integration/reference/sftp/dsf.html。还有:https://docs.spring.io/spring-integration/reference/handler-advice/context-holder.html