Spring Integration 如何根据消息动态设置本地目录/远程目录以发送到远程 sftp 或从中获取文件

问题描述 投票:0回答:1

我目前正在尝试更改一个使用 Spring 集成从 SFTP 服务器发送和接收文件的项目。

更改包括添加其他SFTP服务器,根据条件选择正确的服务器并以相同的方式处理文件。

我目前正在努力解决两件事。

第一个:

我有一个通道,其中消息包含带有远程目录的标头,并且我需要访问正确的 SFTP 会话。但为了进行会话,我需要正确的属性。 (config1或config2,在我的application.yml中定义) 我不确定如何将此信息传递给我的 ServiceActivator。 (我的代码中的第二个 TODO)

第二个:

我需要从多个 SFTP 服务器检索文件,并且需要将这些文件保存在多个本地目录中。远程和本地之间的路径不相同,并且在属性 config1 和 config2 中定义的方式与我在第一个问题中描述的方式相同。 我认为我以正确的方式委派 SFTP 会话,但我不知道如何根据 SFTP 会话设置 localDirectory。 (我的代码中的第一个 TODO)

如果有人可以帮助我一点,我将非常感激。

提前致谢。

这是迄今为止我的代码:

    SftpConfig.Sftp1 sftpConfig1;
    SftpConfig.Sftp2 sftpConfig2;

    @Bean
    @BridgeTo
    public MessageChannel uploadChannel() {
        return new PublishSubscribeChannel();
    }

    @Bean
    public ExpressionParser spelExpressionParser() {
        return new SpelExpressionParser();
    }

    public SessionFactory<ChannelSftp.LsEntry> getSftpSession(SftpConfig sftp) {

        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(sftp.getHost());
        factory.setPort(sftp.getPort());
        factory.setUser(sftp.getUser());
        factory.setPassword(sftp.getPassword());
        factory.setAllowUnknownKeys(true);
        factory.setTimeout(sftp.getTimeout());
        log.info("timeout is set to: {}", sftp.getTimeout());

        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public DelegatingSessionFactory<ChannelSftp.LsEntry> delegatingSf() {

        Map<Object, SessionFactory<ChannelSftp.LsEntry>> mapSession = new HashMap<>();
        mapSession.put("config1", getSftpSession(sftpConfig1));
        mapSession.put("config2", getSftpSession(sftpConfig2));

        SessionFactoryLocator<ChannelSftp.LsEntry> sessionFactoryLocator = new DefaultSessionFactoryLocator<>(mapSession);

        return new DelegatingSessionFactory<>(sessionFactoryLocator);
    }

    @Bean
    public RotatingServerAdvice advice() {

        List<RotationPolicy.KeyDirectory> keyDirectories = sftpConfig1.getCodes().stream()
                .map(code -> new RotationPolicy.KeyDirectory("config1", sftpConfig1.getReaderDirectory() + SEPARATOR + code))
                .collect(Collectors.toList());

        keyDirectories.addAll(sftpConfig2.getCodes().stream()
                .map(code -> new RotationPolicy.KeyDirectory("config2", sftpConfig2.getReaderDirectory() + SEPARATOR + code))
                .collect(Collectors.toList()));

        return new RotatingServerAdvice(delegatingSf(), keyDirectories);
    }

    @Bean
    public IntegrationFlow sftpIntegrationFlow() {
        return IntegrationFlows.from(
                        Sftp.inboundAdapter(delegatingSf())
                                .filter(new SftpSimplePatternFileListFilter("*.csv"))
                                .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
                                .localFilter(new AbstractFileListFilter<File>() {
                                    @Override
                                    public boolean accept(final File file) {
                                        return file.getName().endsWith(".csv");
                                    }
                                })
                                .deleteRemoteFiles(false)
                                .temporaryFileSuffix(".new")
                                .localDirectory(new File()) // TODO dynamic local directory based on sftp session
                                .remoteDirectory("."),
                                    e -> e.poller(Pollers.fixedDelay(1, MINUTES).advice(advice()).advice(logNoFileFoundAdvice())))
                .log(LoggingHandler.Level.INFO, "[SFTP]", m -> "Received file: " + m.getHeaders().get(FileHeaders.FILENAME))
                .channel("filesReceptionChannel")
                .enrichHeaders(h -> h.header("errorChannel", "errorChannel"))
                .get();
    }

    @Bean
    public MethodInterceptor logNoFileFoundAdvice() {
        return invocation -> {
            Object result = invocation.proceed();
            if (result == null) {
                log.info("[SFTP] No files found");
            }
            return result;
        };
    }

    @Bean
    public SftpRemoteFileTemplate sftpTemplate() {
        return new SftpRemoteFileTemplate(sftpSession());
    }

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSession() {
        return getSftpSession(); // TODO dynamic sftp session based on message received in serviceActivator bellow
    }

    @Bean
    @ServiceActivator(inputChannel = "uploadChannel")
    public MessageHandler uploadHandler() {
        return getFtpMessageHandler(sftpSession());
    }

    public MessageHandler getFtpMessageHandler(SessionFactory<ChannelSftp.LsEntry> sftpSession) {
        SftpMessageHandler handler = new SftpMessageHandler(sftpSession);
        handler.setRemoteDirectoryExpressionString("headers['remoteDirectory']");
        handler.setFileNameGenerator(message -> {
            if (message.getPayload() instanceof File) {
                return ((File) message.getPayload()).getName();
            } else {
                throw new IllegalArgumentException("File expected as payload.");
            }
        });
        handler.setUseTemporaryFileName(false);
        return handler;
    }
java spring-integration sftp
1个回答
0
投票

无法在

localDirectory
上更改
AbstractInboundFileSynchronizingMessageSource
。您可以考虑使用
localFilenameExpression()
动态构建目录。虽然
localDirectory
可能只是根 -
/

要发送到不同的 SFTP 服务器,您肯定必须使用

DelegatingSessionFactory
。请参阅文档中的更多信息:https://docs.spring.io/spring-integration/reference/sftp/dsf.html。还有:https://docs.spring.io/spring-integration/reference/handler-advice/context-holder.html

© www.soinside.com 2019 - 2024. All rights reserved.