Spring 集成错误:上传文件时无法写入“foo/002.txt.writing”

问题描述 投票:0回答:1

我正在尝试使用 SI 示例中的以下示例设置来了解其内部工作原理,目标是使其适应上层环境中的类似任务集。

DailyCashReportApplication.java

@Slf4j
@SpringBootApplication
@PropertySource(value = "classpath:feeds-config.yml", factory = com.pru.globalpayments.feeds.downstream.YamlPropertySourceFactory.class)
public class DailyCashReportApplication  implements CommandLineRunner  {
    
//  @Autowired
//  @Qualifier("syntheticRunner") //TODO: to be replaced by runners consuming from appropriate data sources
//  private AbstractRunner runner; 

    public static void main(String[] args) {
        log.info("DailyCashReportApplication running...");
    
        new SpringApplicationBuilder(DailyCashReportApplication.class).web(WebApplicationType.NONE).run(args);
        
    }

    @Override
    public void run(String... args) throws Exception {
        //runner.run(args);
    }

DcrConfig.java:

@Configuration
public class DcrConfig {

    private final static String EMAIL_SUCCESS_SUFFIX = "emailSuccessSuffix";
    
    // sftp

    @Value("${sftp.in.host}")
    @Getter
    private String host;

    @Value("${sftp.in.port}")
    private int port;

    @Value("${sftp.in.user}")
    private String user;

    @Value("${sftp.in.privateKey}")
    @Getter
    private String privateKeyLocation;

    @Value("${sftp.in.remoteDir}")
    @Getter
    private String remoteDir;



    @Value("${sftp.in.localDir}")
    @Getter
    private String localDir;

    @Value("${sftp.in.chmod}")
    private String chmod;

    @Value("${sftp.in.maxFetchSize}")
    @Getter
    private int maxFetchSize;

    @Value("${sftp.in.file.filter}")
    private String fileFilter;

    @Autowired
    private ResourceLoader resourceLoader;
    
    @Bean(name = "downloadSftpSessionFactory")
    public SessionFactory<LsEntry> sftpSessionFactory() throws IOException {

        Resource keyFileResource = resourceLoader.getResource(privateKeyLocation);

        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPrivateKey(keyFileResource);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }

    
    
    @Bean
    public IntegrationFlow fromFile() {
        return IntegrationFlows.from(Files.inboundAdapter(new File(remoteDir))
                                        .preventDuplicates(false)
                                        .patternFilter("*.txt"), 
                e -> e.poller(Pollers.fixedDelay(5000))
                                        .id("fileInboundChannelAdapter"))
                .handle(Files.splitter(true, true))
                .<Object, Class<?>>route(Object::getClass, m->m
                        .channelMapping(FileSplitter.FileMarker.class, "markers.input")
                        .channelMapping(String.class, "lines.input"))
                .get();
    }
    
    @Bean
    public FileWritingMessageHandlerSpec fileOut() {
        return Files.outboundAdapter("'"+localDir+"'")
                .appendNewLine(true)
                .fileNameExpression("payload.substring(1,4) + '.txt'");
    }
    
    @Bean
    public IntegrationFlow lines(FileWritingMessageHandler fileOut) {
        return f -> f.handle(fileOut); 
    }
    

    
    @Bean
    public IntegrationFlow markers() throws IOException{
        return f -> f.<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END),
                        e -> e.id("markerFilter"))
                .publishSubscribeChannel(s -> s

                        // first trigger file flushes
                        .subscribe(sf -> sf.transform("'tmp/out/.*\\.txt'", e -> e.id("toTriggerPattern"))
                                .trigger("fileOut", e -> e.id("flusher")))

                        // send the first file
                        .subscribe(sf -> {
                            try {
                                sf.<FileSplitter.FileMarker, File>transform(p -> new File("tmp/out/002.txt"))
                                        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "002.txt", true))
                                        .handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp002"));
                            } catch (IOException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                            }
                        })

                        // send the second file
                        .subscribe(sf -> {
                            try {
                                sf.<FileSplitter.FileMarker, File>transform(p -> new File("/tmp/out/006.txt"))
                                        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "006.txt", true))
                                        .handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp006"));
                            } catch (IOException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                            }
                        })

                        // send the third file
                        .subscribe(sf -> {
                            try {
                                sf.<FileSplitter.FileMarker, File>transform(p -> new File("/tmp/out/009.txt"))
                                        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "009.txt", true))
                                        .handle(Sftp.outboundAdapter(ftp()).remoteDirectory("foo"), e -> e.id("ftp009"));
                            } catch (IOException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                            }
                        })


                        );
    }
    

    
    

    

    
    @Bean
    public SessionFactory<LsEntry> ftp() throws IOException {

        Resource keyFileResource = resourceLoader.getResource(privateKeyLocation);


        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPrivateKey(keyFileResource);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }
    
    @Bean
    public MethodInterceptor afterMailAdvice() {
        return invocation -> {
            Message<?> message = (Message<?>) invocation.getArguments()[0];
            MessageHeaders headers = message.getHeaders();
            File originalFile = headers.get(FileHeaders.ORIGINAL_FILE, File.class);
            try {
                invocation.proceed();
                originalFile.renameTo(new File(originalFile.getAbsolutePath() + headers.get(EMAIL_SUCCESS_SUFFIX)));
            }
            catch(Exception e) {
                originalFile.renameTo(new File(originalFile.getAbsolutePath() + headers.get(EMAIL_SUCCESS_SUFFIX)+"email.failed"));
            }
            return null;
        };
    }
    
}

application-dev.yml

spring:
  application:
    name: daily-cash-report-application

sftp:
  in:
    host: eafdev
    port: 22
    user: eafdev
    remoteDir: tmp/in 
    tmpDir: 
    localDir: tmp/out 
    maxFetchSize: 1    
    privateKey: file:///C:/Users/x12345/AppData/Roaming/SSH/UserKeys/distributessh
    chmod: 664 
    poller:
      fixedDelay: 10000
    file:
      filter: A.B.*    #TODO: calibrate once spec is known
  out:
    host: eafdev
    port: 22
    user: eafdev
    remoteDir: tmp/out  
    privateKey: file:///C:/Users/x12345/AppData/Roaming/SSH/UserKeys/distributessh
    chmod: 664 

file: 
  out:   
    targetDir: 
    tmpDir:

DailyCashReportApplicationTest.java

@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@SpringIntegrationTest(noAutoStartup = "fileInboundChannelAdapter")
@SpringJUnitConfig(classes = { 
        DailyCashReportApplication.class,
        DcrConfig.class,
        })
@EnableConfigurationProperties
@PropertySource(value = "application-dev.yml", factory = YamlPropertySourceFactory.class)
@Slf4j
class DailyCashReportApplicationTest {

    @Test
    @SneakyThrows
    void test()  {

        File in = new File("tmp/in/", "foo");
        FileOutputStream fos = new FileOutputStream(in);
        fos.write("*002,foo,bar\n*006,baz,qux\n*009,fiz,buz\n".getBytes());
        fos.close();
        in.renameTo(new File("tmp/in/", "foo.txt"));

        File out = new File("tmp/out/002.txt");
        int n = 0;
        while (n++ < 100 && (!out.exists() || out.length() < 12)) {
            Thread.sleep(100);
        }
        assertThat(out.exists()).isTrue();

    }

}

无论是通过 *ApplicationTest.java 运行,还是通过在 IDE 中执行上述 *Application.java 文件并将相同的 foo.txt 内容放入预先创建的

tmp/in
文件夹中,我都看到 partially 正确的行为就
tmp\in\foo.txt
被拾取并在
tmp/out
中创建 3 个文件而言,并伴有以下错误:

2023-09-20 11:40:21.384 [scheduling-1] ERROR o.s.i.handler.LoggingHandler - org.springframework.messaging.MessageDeliveryException: Error handling message for file [C:\code\STS\daily-cash-report\tmp\out\002.txt -> 002.txt]; nested exception is org.springframework.messaging.MessagingException: Failed to write to 'foo/002.txt.writing' while uploading the file; nested exception is java.io.IOException: failed to write file, failedMessage=GenericMessage [payload=tmp\out\002.txt, headers={file_lineCount=3, file_name=002.txt, file_originalFile=tmp\in\foo.txt, id=365e346c-78d4-47ab-d6c0-e4f0f5f72f43, file_marker=END, file_relativePath=foo.txt, timestamp=1695224420443}]
    at org.springframework.integration.file.remote.RemoteFileTemplate.doSend(RemoteFileTemplate.java:368)
    at org.springframework.integration.file.remote.RemoteFileTemplate.lambda$send$0(RemoteFileTemplate.java:314)
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:452)
    at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:314)
    at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:302)
    at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:294)
    at org.springframework.integration.file.remote.handler.FileTransferringMessageHandler.handleMessageInternal(FileTransferringMessageHandler.java:207)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:456)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:456)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:456)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:178)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:456)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:456)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:317)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:474)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:460)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:347)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:340)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.messaging.MessagingException: Failed to write to 'foo/002.txt.writing' while uploading the file; nested exception is java.io.IOException: failed to write file
    at org.springframework.integration.file.remote.RemoteFileTemplate.sendFileToRemoteDirectory(RemoteFileTemplate.java:573)
    at org.springframework.integration.file.remote.RemoteFileTemplate.doSend(RemoteFileTemplate.java:353)
    ... 127 more
Caused by: java.io.IOException: failed to write file
    at org.springframework.integration.sftp.session.SftpSession.write(SftpSession.java:175)
    at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.write(CachingSessionFactory.java:237)
    at org.springframework.integration.file.remote.RemoteFileTemplate.doSend(RemoteFileTemplate.java:582)
    at org.springframework.integration.file.remote.RemoteFileTemplate.sendFileToRemoteDirectory(RemoteFileTemplate.java:570)
    ... 128 more
Caused by: 2: No such file
    at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2873)
    at com.jcraft.jsch.ChannelSftp._put(ChannelSftp.java:594)
    at com.jcraft.jsch.ChannelSftp.put(ChannelSftp.java:540)
    at com.jcraft.jsch.ChannelSftp.put(ChannelSftp.java:492)
    at org.springframework.integration.sftp.session.SftpSession.write(SftpSession.java:172)
    ... 131 more

要点似乎是:

Failed to write to 'foo/002.txt.writing' while uploading the file; nested exception is java.io.IOException: failed to write file, failedMessage=GenericMessage [payload=tmp\out\002.txt, headers={file_lineCount=3, file_name=002.txt, file_originalFile=tmp\in\foo.txt, id=365e346c-78d4-47ab-d6c0-e4f0f5f72f43, file_marker=END, file_relativePath=foo.txt, timestamp=1695224420443}]

它会被扔到

tmp\in
文件夹中的全新文件上,然后在每次轮询器执行时重复出现,同时正确地分割成 3 个结果文件:
002.txt
006.txt
009.txt
,如上所述上面。

我知道

.writing
扩展(根据 docs),但这似乎不是预期的行为。在使用该 SI 示例时,我想弄清楚为什么会发生此错误,以便我可以通过将应用程序从这个实验环境迁移到真实环境中来消除它。

java spring-boot spring-integration messaging spring-integration-dsl
1个回答
0
投票

失败是因为没有

foo
远程目录来上传文件。

参见

autoCreateDirectory(true)
选项:

/**
 * A {@code boolean} flag to indicate automatically create the directory or not.
 * @param autoCreateDirectory true to automatically create the directory.
 * @return the current Spec
 */
public S autoCreateDirectory(boolean autoCreateDirectory) {

false
上默认为
RemoteFileTemplate

因此你的配置必须是这样的:

Sftp.outboundAdapter(ftp()).remoteDirectory("foo").autoCreateDirectory(true)
© www.soinside.com 2019 - 2024. All rights reserved.