确定 FileWrittenEvent 何时完成整个文件的写入

问题描述 投票:0回答:2

这是我的第一个问题,所以请耐心等待。
在 Spring 5.2 的最新版本中,Spring Integration 中添加了某些非常有用的组件,如以下链接所示:
https://docs.spring.io/ spring-integration/reference/html/sftp.html#sftp-server-events
Apache MINA 与新的侦听器“ApacheMinaSftpEventListener”集成,其中

侦听某些 Apache Mina SFTP 服务器事件并将其发布为 ApplicationEvents

到目前为止,我的应用程序可以捕获所提供链接的文档中所述的应用程序事件,但我似乎无法弄清楚事件何时完成......如果这有意义(可能没有)。

在流程中应用程序启动并在指定端口上作为 SFTP 服务器激活。
我可以使用用户名和密码连接到启动传输的系统并将文件“放置”到系统上。
当我登录时,我可以捕获“SessionOpenedEvent”
当我传输文件时,我可以捕获“FileWrittenEvent”
当我注销或断开连接时,我可以捕获“SessionClosedEvent”
当文件较大时,我可以捕获所有“FileWrittenEvent”事件这告诉我传输发生在预定或计算大小的缓冲区的流上。

我试图确定的是“如何知道该流何时完成”。这将帮助我回答“作为接受文件的 SFTP 服务器,我什么时候可以访问已完成的文件?”

我的监听器 bean(通过 SubSystemFactory 启动时附加到 Apache Mina)

@Configuration
public class SftpConfiguration {
    @Bean
    public ApacheMinaSftpEventListener apacheMinaSftpEventListener() {
        return new ApacheMinaSftpEventListener();
    }   
}
SftpSubsystemFactory subSystem = new SftpSubsystemFactory();
subSystem.addSftpEventListener(listener);

我的事件监听器:这是在这里,所以我可以在记录器中看到一些输出,当我意识到在几 GB 文件上时,FileWrittenEvent 变得有点疯狂。

@Async
@EventListener
public void sftpEventListener(ApacheMinaSftpEvent sftpEvent) {
    log.info("Capturing Event: ", sftpEvent.getClass().getSimpleName());
    log.info("Event Details: ", sftpEvent.toString());
}

这几部分是我开始捕获事件所需的全部

我想我需要重写一个方法来帮助我在流完成时捕获,以便我可以继续我的业务逻辑,但我不确定
我似乎能够在流完成之前访问文件(读/写),所以我似乎无法使用尝试“移动”文件并等待它抛出的逻辑一个错误,尽管这种方法看起来像对我来说不好的做法。

任何指导将不胜感激,谢谢。

版本控制信息

  • 春季5.2.3
  • Spring Boot 2.2.3
  • 阿帕奇米娜 2.1.3
  • Java 1.8
spring-boot apache-mina spring-integration-sftp
2个回答
0
投票

这可能对其他人没有帮助,但我通过将相关解决方案与此答案中找到的新 Apache MINA 类相结合,找到了解决我最初问题的方法:
https://stackoverflow.com/a/45513680/ 12806809
我的解决方案:
创建一个扩展新 ApacheMinaSftpEventListener 的类,同时重写“打开”和“关闭”方法,以确保我的 SFTP 服务器业务逻辑知道文件何时完成写入。

public class WatcherSftpEventListener extends ApacheMinaSftpEventListener {
    ...
    ...
    @Override public void open(ServerSession session, String remoteHandle, Handle localHandle) throws IOException {
        File file = localHandle.getFile().toFile();
        if (file.isFile() && file.exists()) {
            log.debug("File Open: {}", file.toString());
        }
        // Keep around the super call for now
        super.open(session, remoteHandle, localHandle);
    }

    @Override
    public void close(ServerSession session, String remoteHandle, Handle localHandle) {
        File file = localHandle.getFile().toFile();
        if (file.isFile() && file.exists()) {
            log.debug("RemoteHandle: {}", remoteHandle);
            log.debug("File Closed: {}", file.toString());
            for (SftpFileUploadCompleteListener listener : fileReadyListeners) {
                try {
                    listener.onFileReady(file);
                } catch (Exception e) {
                    String msg = String.format("File '%s' caused an error in processing '%s'", file.getName(), e.getMessage());
                    log.error(msg);
                    try {
                        session.disconnect(0, msg);
                    } catch (IOException io) {
                        log.error("Could not properly disconnect from session {}; closing future state", session);
                        session.close(false);
                    }
                }
            }
        }
        // Keep around the super call for now
        super.close(session, remoteHandle, localHandle);
    }

}

当我启动 SSHD 服务器时,我将新的侦听器 bean 添加到 SftpSubsystemFactory,它使用自定义的事件处理程序类对传入文件应用我的业务逻辑。

        watcherSftpEventListener.addFileReadyListener(new SftpFileUploadCompleteListener() {
            @Override
            public void onFileReady(File file) throws Exception {
                new WatcherSftpEventHandler(file, properties.getSftphost());
            }
        });
        subSystem.addSftpEventListener(watcherSftpEventListener);

这个解决方案还有更多内容,但由于这个问题没有得到那么多流量,而且它比现在更适合我的参考和学习,除非有人要求,否则我不会提供更多内容。


0
投票

使用 Spring Boot 集成的潜在解决方案:

首先将

ApacheMinaSftpEventListener
注入SSHD Mina

// Listener is an instance of ApacheMinaSftpEventListener
var sftp = new SftpSubsystemFactory();
sftp.addSftpEventListener(listener);

然后,实现一个监听

ApacheMinaSftpEvent
的组件,如下所示:

@Component
public class UploadEventListener {

    private final Map<ServerSession, Set<Path>> files = new ConcurrentHashMap<>();

    @EventListener
    void onSessionOpen(SessionOpenedEvent event) {
        System.out.println("Upload starting");
    }

    @EventListener
    void onReceiveData(FileWrittenEvent event) {
        var file = event.getFile();
        var set = files.computeIfAbsent(event.getSession(), s -> new HashSet<>());
        set.add(file);
    }

    @EventListener
    void onSessionClosed(SessionClosedEvent event) {
        System.out.println("Upload done");
        var set = files.remove(event.getSession());
        System.out.println(set);
    }
}

© www.soinside.com 2019 - 2024. All rights reserved.