FTP入站通道适配器

FTP入站通道适配器是一个特殊的监听器,它连接到FTP服务器并监听远程目录事件(例如,创建新文件),然后启动文件传输。 以下示例展示了如何配置`inbound-channel-adapter`:

<int-ftp:inbound-channel-adapter id="ftpInbound"
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    auto-create-local-directory="true"
    delete-remote-files="true"
    filename-pattern="*.txt"
    remote-directory="some/remote/path"
    remote-file-separator="/"
    preserve-timestamp="true"
    local-filename-generator-expression="#this.toUpperCase() + '.a'"
    scanner="myDirScanner"
    local-filter="myFilter"
    temporary-file-suffix=".writing"
    max-fetch-size="-1"
    local-directory=".">
    <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

如前面的配置所示,您可以通过使用`inbound-channel-adapter`元素并为各种属性(例如`local-directory`、filename-pattern(基于简单模式匹配,而不是正则表达式)以及对`session-factory`的引用)提供值来配置FTP入站通道适配器。 默认情况下,传输的文件与原始文件具有相同的名称。 如果您想覆盖此行为,可以设置`local-filename-generator-expression`属性,该属性允许您提供一个SpEL表达式来生成本地文件的名称。 与出站网关和适配器不同,出站网关和适配器中SpEL评估上下文的根对象是`Message`,此入站适配器在评估时还没有消息,因为它最终会生成以传输文件作为其有效负载的消息。 因此,SpEL评估上下文的根对象是远程文件的原始名称(一个`String`)。 入站通道适配器首先检索本地目录的`File`对象,然后根据轮询器配置发出每个文件。 从5.0版本开始,当需要检索新文件时,您现在可以限制从FTP服务器获取的文件数量。 当目标文件非常大或在具有持久文件列表过滤器的集群系统中运行时,这可能很有益,稍后将讨论。 为此目的使用`max-fetch-size`。 负值(默认值)表示没有限制,并且检索所有匹配的文件。 有关更多信息,请参阅入站通道适配器:控制远程文件获取。 从5.0版本开始,您还可以通过设置`scanner`属性向`inbound-channel-adapter`提供自定义的`DirectoryScanner`实现。 从Spring Integration 3.0开始,您可以指定`preserve-timestamp`属性(其默认值为`false`)。 当为`true`时,本地文件的修改时间戳被设置为从服务器检索到的值。 否则,它被设置为当前时间。 从4.2版本开始,您可以指定`remote-directory-expression`而不是`remote-directory`,允许您在每次轮询时动态确定目录——例如,remote-directory-expression="@myBean.determineRemoteDir()"。 从4.3版本开始,您可以省略`remote-directory`和`remote-directory-expression`属性。 它们默认为`null`。 在这种情况下,根据FTP协议,客户端工作目录用作默认远程目录。 有时,基于`filename-pattern`属性指定的简单模式进行文件过滤可能不足。 如果出现这种情况,您可以使用`filename-regex`属性来指定正则表达式(例如`filename-regex=".*\.test$"`)。 此外,如果您需要完全控制,可以使用`filter`属性并提供对`o.s.i.file.filters.FileListFilter`的任何自定义实现的引用,这是一个用于过滤文件列表的策略接口。 此过滤器确定检索哪些远程文件。 您还可以通过使用`CompositeFileListFilter`将基于模式的过滤器与其他过滤器(例如`AcceptOnceFileListFilter`以避免同步以前已获取的文件)结合使用。 AcceptOnceFileListFilter`将其状态存储在内存中。 如果您希望状态在系统重启后仍然存在,请考虑改用`FtpPersistentAcceptOnceFileListFilter。 此过滤器将接受的文件名存储在`MetadataStore`策略的实例中(参见元数据存储)。 此过滤器匹配文件名和远程修改时间。 从4.0版本开始,此过滤器需要一个`ConcurrentMetadataStore`。 当与共享数据存储(例如`Redis`与`RedisMetadataStore`)一起使用时,它允许在多个应用程序或服务器实例之间共享过滤键。 从5.0版本开始,默认情况下,对于`FtpInboundFileSynchronizer`,会应用带有内存中`SimpleMetadataStore`的`FtpPersistentAcceptOnceFileListFilter`。 此过滤器还应用于XML配置中的`regex`或`pattern`选项,以及Java DSL中的`FtpInboundChannelAdapterSpec`。 任何其他用例都可以通过`CompositeFileListFilter`(或`ChainFileListFilter`)进行管理。 前面的讨论指的是在检索文件之前过滤文件。 一旦文件被检索,一个额外的过滤器将应用于文件系统上的文件。 默认情况下,这是一个`AcceptOnceFileListFilter`,如前所述,它在内存中保留状态,并且不考虑文件的修改时间。 除非您的应用程序在处理后删除文件,否则适配器默认会在应用程序重启后重新处理磁盘上的文件。 此外,如果您将`filter`配置为使用`FtpPersistentAcceptOnceFileListFilter`并且远程文件时间戳发生变化(导致其被重新获取),则默认的本地过滤器不允许处理此新文件。 有关此过滤器及其使用方式的更多信息,请参阅远程持久文件列表过滤器。 您可以使用`local-filter`属性配置本地文件系统过滤器的行为。 从4.3.8版本开始,默认配置`FileSystemPersistentAcceptOnceFileListFilter`。 此过滤器将接受的文件名和修改时间戳存储在`MetadataStore`策略的实例中(参见元数据存储),并检测本地文件修改时间的变化。 默认的`MetadataStore`是一个`SimpleMetadataStore`,它将状态存储在内存中。 从4.1.5版本开始,这些过滤器有一个新属性(flushOnUpdate),它使它们在每次更新时刷新 元数据存储(如果存储实现了`Flushable`)。

此外,如果您使用分布式`MetadataStore`(例如Redis),您可以拥有同一适配器或应用程序的多个实例,并确保每个文件只处理一次。

实际的本地过滤器是一个`ChainFileListFilter`,它包含一个模式过滤器,该过滤器阻止处理正在下载的文件(基于`temporary-file-suffix`),以及提供的过滤器。 文件下载时带有此后缀(默认值为`.writing`),并在传输完成后将文件重命名为最终名称,使其对过滤器“可见”。 remote-file-separator`属性允许您配置文件分隔符字符,以防默认的'/'不适用于您的特定环境。 有关这些属性的更多详细信息,请参阅schema。 您还应该了解FTP入站通道适配器是一个轮询消费者。 因此,您必须配置一个轮询器(通过使用全局默认值或本地子元素)。 一旦文件被传输,一个以`java.io.File`作为其有效负载的消息将被生成并发送到由`channel`属性标识的通道。 从6.2版本开始,您可以使用`FtpLastModifiedFileListFilter`根据上次修改策略过滤FTP文件。 此过滤器可以使用`age`属性进行配置,以便只有早于此值的文件才通过过滤器。 默认年龄为60秒,但您应该选择一个足够大的年龄,以避免过早地获取文件(例如,由于网络故障)。 有关更多信息,请查阅其Javadoc。 相比之下,从6.5版本开始,引入了`FtpRecentFileListFilter,仅接受不早于给定`age`的文件。

更多关于文件过滤和不完整文件

有时,刚刚出现在受监控(远程)目录中的文件是不完整的。 通常,此类文件以临时扩展名(例如`somefile.txt.writing`)写入,并在写入过程完成后重命名。 在大多数情况下,您只对完整的文件感兴趣,并希望只过滤完整的文件。 为了处理这些情况,您可以使用`filename-pattern`、`filename-regex`和`filter`属性提供的过滤支持。 以下示例使用自定义过滤器实现:

<int-ftp:inbound-channel-adapter
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    filter="customFilter"
    local-directory="file:/my_transfers">
    remote-directory="some/remote/path"
    <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

<bean id="customFilter" class="org.example.CustomFilter"/>

入站FTP适配器的轮询器配置注意事项

入站FTP适配器的工作包括两个任务:

  1. 与远程服务器通信,以便将文件从远程目录传输到本地目录。

  2. 对于每个传输的文件,生成一个以该文件作为有效负载的消息,并将其发送到由“channel”属性标识的通道。 这就是为什么它们被称为“通道适配器”而不是仅仅是“适配器”。 这种适配器的主要工作是生成消息以发送到消息通道。 本质上,第二个任务优先,如果您的本地目录中已经有一个或多个文件,它会首先从这些文件生成消息。 只有当所有本地文件都已处理完毕后,它才会启动远程通信以检索更多文件。

此外,在轮询器上配置触发器时,您应该密切关注`max-messages-per-poll`属性。 对于所有`SourcePollingChannelAdapter`实例(包括FTP),其默认值为`1`。 这意味着,一旦处理完一个文件,它就会等待由您的触发器配置确定的下一次执行时间。 如果您碰巧在`local-directory`中有一个或多个文件,它会在启动与远程FTP服务器的通信之前处理这些文件。 此外,如果`max-messages-per-per-poll`设置为`1`(默认值),它会一次处理一个文件,间隔由您的触发器定义,本质上是“一次轮询 === 一个文件”的工作方式。

对于典型的文件传输用例,您很可能希望相反的行为:在每次轮询时处理所有可能的文件,然后才等待下一次轮询。 如果是这种情况,请将`max-messages-per-poll`设置为-1。 然后,在每次轮询时,适配器会尝试生成尽可能多的消息。 换句话说,它会处理本地目录中的所有内容,然后连接到远程目录,将其中所有可用的内容传输到本地进行处理。 只有这样,轮询操作才被认为是完整的,轮询器会等待下一次执行时间。

您也可以将“max-messages-per-poll”值设置为一个正值,该值表示每次轮询从文件中创建的消息的上限。 例如,值为`10`意味着在每次轮询时,它尝试处理不超过十个文件。

从故障中恢复

了解适配器的架构很重要。 有一个文件同步器负责获取文件,还有一个`FileReadingMessageSource`负责为每个同步文件发出消息。 如前所述,涉及两个过滤器。 `filter`属性(和模式)指的是远程(FTP)文件列表,以避免获取已经获取的文件。 `local-filter`由`FileReadingMessageSource`用于确定哪些文件要作为消息发送。

同步器列出远程文件并咨询其过滤器。 然后传输文件。 如果在文件传输期间发生IO错误,任何已添加到过滤器的文件都将被删除,以便它们可以在下一次轮询时重新获取。 这仅适用于过滤器实现了`ReversibleFileListFilter`(例如`AcceptOnceFileListFilter`)的情况。

如果在同步文件后,下游流处理文件时发生错误,则不会自动回滚过滤器,因此失败的文件默认不会重新处理。

如果您希望在失败后重新处理此类文件,可以使用类似于以下配置来促进从过滤器中删除失败的文件:

<int-ftp:inbound-channel-adapter id="ftpAdapter"
        session-factory="ftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/ftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-ftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

前面的配置适用于任何`ResettableFileListFilter`。

从5.0版本开始,入站通道适配器可以在本地构建与生成的本地文件名对应的子目录。 这也可以是远程子路径。 为了能够根据层次结构支持递归读取本地目录以进行修改,您现在可以为内部`FileReadingMessageSource`提供一个基于`Files.walk()算法的新`RecursiveDirectoryScanner。 有关更多信息,请参见AbstractInboundFileSynchronizingMessageSource.setScanner()。 此外,您现在可以通过使用`setUseWatchService()选项将`AbstractInboundFileSynchronizingMessageSource`切换到基于`WatchService`的`DirectoryScanner。 它还配置了所有`WatchEventType`实例,以对本地目录中的任何修改做出反应。 前面显示的重新处理示例基于`FileReadingMessageSource.WatchServiceDirectoryScanner`的内置功能,当文件从本地目录删除(StandardWatchEventKinds.ENTRY_DELETE)时执行`ResettableFileListFilter.remove()。 有关更多信息,请参见`WatchServiceDirectoryScanner

使用Java配置

以下Spring Boot应用程序展示了如何使用Java配置入站适配器:

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        sf.setTestSession(true);
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
        FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> ftpMessageSource() {
        FtpInboundFileSynchronizingMessageSource source =
                new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("ftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "ftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用Java DSL进行配置

以下Spring Boot应用程序展示了如何使用Java DSL配置入站适配器:

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow ftpInboundFlow() {
        return IntegrationFlow
            .from(Ftp.inboundAdapter(this.ftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilename(f -> f.toUpperCase() + ".a")
                    .localDirectory(new File("d:\\ftp_files")),
                e -> e.id("ftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

处理不完整数据

请参阅处理不完整数据

提供了`FtpSystemMarkerFilePresentFileListFilter`来过滤远程系统中没有相应标记文件的远程文件。 有关配置信息,请参阅Javadoc(并浏览其父类)。