读取文件
FileReadingMessageSource
可用于从文件系统消费文件。
这是 MessageSource
的一个实现,它从文件系统目录创建消息。
以下示例展示了如何配置 FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
为了防止为某些文件创建消息,您可以提供一个 FileListFilter
。
默认情况下,我们使用以下过滤器:
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
IgnoreHiddenFileListFilter
确保隐藏文件不被处理。
请注意,隐藏文件的确切定义是系统相关的。
例如,在基于 UNIX 的系统中,以句点字符开头的文件被认为是隐藏的。
而 Microsoft Windows 则有一个专门的文件属性来指示隐藏文件。
版本 4.2 引入了 IgnoreHiddenFileListFilter
。
在早期版本中,隐藏文件是包含在内的。
在默认配置下,IgnoreHiddenFileListFilter
会首先触发,然后是 AcceptOnceFileListFilter
。
AcceptOnceFileListFilter
确保文件只从目录中选取一次。
|
持久文件列表过滤器现在有一个布尔属性 forRecursion
。
将此属性设置为 true
,同时也会设置 alwaysAcceptDirectories
,这意味着出站网关(ls
和 mget
)上的递归操作现在每次都会遍历完整的目录树。
这是为了解决目录树深处的变化未被检测到的问题。
此外,forRecursion=true
会导致将文件的完整路径用作元数据存储键;这解决了当同一名称的文件在不同目录中多次出现时过滤器无法正常工作的问题。
IMPORTANT: 这意味着在持久元数据存储中,对于顶层目录下的文件,现有键将无法找到。
因此,该属性默认为 false
;这可能会在未来的版本中改变。
以下示例配置了一个带有过滤器的 FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件的一个常见问题是,文件可能在准备好之前就被检测到(即,其他进程可能仍在写入文件)。
默认的 AcceptOnceFileListFilter
并不能阻止这种情况。
在大多数情况下,如果文件写入进程在文件准备好读取后立即重命名每个文件,则可以防止这种情况。
一个 filename-pattern
或 filename-regex
过滤器,它只接受已准备好的文件(可能基于已知后缀),与默认的 AcceptOnceFileListFilter
组合使用,可以处理这种情况。
CompositeFileListFilter
允许组合,如下例所示:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果不可能使用临时名称创建文件并重命名为最终名称,Spring Integration 提供了另一种替代方案。
版本 4.2 添加了 LastModifiedFileListFilter
。
此过滤器可以用 age
属性进行配置,以便只有比此值更旧的文件才能通过过滤器。
age
默认为 60 秒,但您应该选择一个足够大的 age
以避免过早地选取文件(例如,由于网络故障)。
以下示例展示了如何配置 LastModifiedFileListFilter
:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从版本 4.3.7 开始,引入了 ChainFileListFilter
(CompositeFileListFilter
的扩展),以允许后续过滤器只查看前一个过滤器结果的场景。
(使用 CompositeFileListFilter
,所有过滤器都查看所有文件,但它只传递通过所有过滤器的文件)。
需要新行为的一个示例是 LastModifiedFileListFilter
和 AcceptOnceFileListFilter
的组合,当我们不希望在经过一段时间后才接受文件时。
使用 CompositeFileListFilter
,由于 AcceptOnceFileListFilter
在第一次传递时看到了所有文件,因此当其他过滤器生效时,它不会在以后传递。
CompositeFileListFilter
方法在模式过滤器与自定义过滤器结合使用时很有用,该自定义过滤器查找辅助文件以指示文件传输已完成。
模式过滤器可能只传递主文件(例如 something.txt
),但“完成”过滤器需要查看(例如)something.done
是否存在。
假设我们有文件 a.txt
、a.done
和 b.txt
。
模式过滤器只传递 a.txt
和 b.txt
,而“完成”过滤器看到所有三个文件并只传递 a.txt
。
组合过滤器的最终结果是只释放 a.txt
。
使用 |
版本 5.0 引入了 ExpressionFileListFilter
,用于针对文件执行 SpEL 表达式作为上下文评估根对象。
为此,所有用于文件处理(本地和远程)的 XML 组件,以及现有的 filter
属性,都提供了 filter-expression
选项,如下例所示:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
版本 5.0.5 引入了 DiscardAwareFileListFilter
实现,它们对被拒绝的文件感兴趣。
为此,此类过滤器实现应通过 addDiscardCallback(Consumer<File>)
提供回调。
在框架中,此功能从 FileReadingMessageSource.WatchServiceDirectoryScanner
与 LastModifiedFileListFilter
结合使用。
与常规的 DirectoryScanner
不同,WatchService
根据目标文件系统上的事件提供文件进行处理。
在轮询内部队列中的这些文件时,LastModifiedFileListFilter
可能会因为它们相对于其配置的 age
太年轻而丢弃它们。
因此,我们丢失了文件以供将来可能的考虑。
丢弃回调钩子允许我们将文件保留在内部队列中,以便在后续轮询中检查其 age
。
CompositeFileListFilter
也实现了 DiscardAwareFileListFilter
,并将其丢弃回调填充到其所有 DiscardAwareFileListFilter
委托中。
由于 |
从版本 5.1 开始,FileReadingMessageSource
在其 start()
被调用(通常通过包装 SourcePollingChannelAdapter
)之前,不会检查目录是否存在,也不会创建它。
以前,当引用目录时(例如从测试中,或权限稍后应用时),没有简单的方法可以防止操作系统权限错误。
与 LastModifiedFileListFilter
相反,从版本 6.5 开始引入了 RecentFileListFilter
策略。
它是 AbstractRecentFileListFilter
的本地文件系统扩展。
默认情况下,它接受不超过 1 天的文件。
请参阅其其他实现以了解相应的远程文件协议。
消息头
从版本 5.0 开始,FileReadingMessageSource
(除了作为轮询 File
的 payload
)还会将以下头填充到出站 Message
中:
-
FileHeaders.FILENAME
:要发送的文件的File.getName()
。 可用于后续的重命名或复制逻辑。 -
FileHeaders.ORIGINAL_FILE
:File
对象本身。 通常,当原始File
对象丢失时,此头由框架组件(例如 拆分器 或 转换器)自动填充。 但是,为了与任何其他自定义用例保持一致和方便,此头对于访问原始文件可能很有用。 -
FileHeaders.RELATIVE_PATH
:引入的新头,表示文件路径中相对于扫描根目录的部分。 当需要在其他地方恢复源目录层次结构时,此头可能很有用。 为此,DefaultFileNameGenerator
(请参阅“生成文件名”)可以配置为使用此头。
目录扫描和轮询
FileReadingMessageSource
不会立即为目录中的文件生成消息。
它使用一个内部队列来存储 scanner
返回的“符合条件的文件”。
scanEachPoll
选项用于确保内部队列在每次轮询时都用最新的输入目录内容进行刷新。
默认情况下(scanEachPoll = false
),FileReadingMessageSource
会清空其队列,然后再次扫描目录。
这种默认行为对于减少对目录中大量文件的扫描特别有用。
但是,在需要自定义排序的情况下,考虑将此标志设置为 true
的影响很重要。
文件的处理顺序可能不像预期那样。
默认情况下,队列中的文件按其自然 (path
) 顺序处理。
即使队列中已有文件,扫描添加的新文件也会插入到适当的位置以保持该自然顺序。
要自定义顺序,FileReadingMessageSource
可以接受 Comparator<File>
作为构造函数参数。
它由内部 (PriorityBlockingQueue
) 使用,以根据业务需求重新排序其内容。
因此,要按特定顺序处理文件,您应该向 FileReadingMessageSource
提供一个比较器,而不是对自定义 DirectoryScanner
生成的列表进行排序。
版本 5.0 引入了 RecursiveDirectoryScanner
来执行文件树访问。
该实现基于 Files.walk(Path start, int maxDepth, FileVisitOption… options)
功能。
根目录 (DirectoryScanner.listFiles(File)
) 参数从结果中排除。
所有其他子目录的包含和排除都基于目标 FileListFilter
实现。
例如,SimplePatternFileListFilter
默认过滤掉目录。
有关更多信息,请参阅 AbstractDirectoryAwareFileListFilter
及其实现。
从版本 5.5 开始,Java DSL 的 |
命名空间支持
文件读取的配置可以通过使用文件特定的命名空间来简化。 为此,请使用以下模板:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在此命名空间中,您可以将 FileReadingMessageSource
简化并将其包装在入站通道适配器中,如下所示:
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个通道适配器示例依赖于默认的 FileListFilter
实现:
-
IgnoreHiddenFileListFilter
(不处理隐藏文件) -
AcceptOnceFileListFilter
(防止重复)
因此,您也可以省略 prevent-duplicates
和 ignore-hidden
属性,因为它们默认为 true
。
Spring Integration 4.2 引入了 ignore-hidden
属性。
在早期版本中,隐藏文件是包含在内的。
第二个通道适配器示例使用自定义过滤器,第三个使用 filename-pattern
属性添加基于 AntPathMatcher
的过滤器,第四个使用 filename-regex
属性向 FileReadingMessageSource
添加基于正则表达式模式的过滤器。
filename-pattern
和 filename-regex
属性各自与常规 filter
引用属性互斥。
但是,您可以使用 filter
属性引用 CompositeFileListFilter
的实例,该实例结合了任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足您的特定需求。
当多个进程从同一目录读取时,您可能希望锁定文件以防止它们被并发拾取。
为此,您可以使用 FileLocker
。
有一个基于 java.nio
的实现可用,但也可以实现自己的锁定方案。
nio
锁定器可以注入,如下所示:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
您可以配置自定义锁定器,如下所示:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置有锁定器时,它负责在文件被允许接收之前获取锁定。
它不承担解锁文件的责任。
如果您已处理文件并保持锁定悬挂,则会出现内存泄漏。
如果这是一个问题,您应该在适当的时候自己调用 |
当过滤和锁定文件不足时,您可能需要完全控制文件的列出方式。
要实现此类要求,您可以使用 DirectoryScanner
的实现。
此扫描器允许您精确确定每次轮询中列出的文件。
这也是 Spring Integration 内部用于将 FileListFilter
实例和 FileLocker
连接到 FileReadingMessageSource
的接口。
您可以通过 scanner
属性将自定义 DirectoryScanner
注入到 <int-file:inbound-channel-adapter/>
中,如下例所示:
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做可以为您提供完全的自由来选择排序、列表和锁定策略。
了解过滤器(包括 patterns
、regex
、prevent-duplicates
等)和 locker
实例实际上由 scanner
使用也很重要。
适配器上设置的任何这些属性随后都会注入到内部 scanner
中。
对于外部 scanner
的情况,FileReadingMessageSource
上禁止所有过滤器和锁定器属性。
它们必须(如果需要)在该自定义 DirectoryScanner
上指定。
换句话说,如果您将 scanner
注入到 FileReadingMessageSource
中,您应该在该 scanner
上提供 filter
和 locker
,而不是在 FileReadingMessageSource
上。
默认情况下, |
WatchServiceDirectoryScanner
FileReadingMessageSource.WatchServiceDirectoryScanner
依赖于文件系统事件,当新文件添加到目录时。
在初始化期间,目录被注册以生成事件。
初始文件列表也在初始化期间构建。
在遍历目录树时,遇到的任何子目录也会注册以生成事件。
在第一次轮询时,返回遍历目录的初始文件列表。
在随后的轮询中,返回来自新创建事件的文件。
如果添加了新的子目录,其创建事件将用于遍历新的子树以查找现有文件并注册任何找到的新子目录。
|
WatchServiceDirectoryScanner
可以通过 FileReadingMessageSource.use-watch-service
选项启用,该选项与 scanner
选项互斥。
将为提供的 directory
填充一个内部 FileReadingMessageSource.WatchServiceDirectoryScanner
实例。
此外,现在 WatchService
轮询逻辑可以跟踪 StandardWatchEventKinds.ENTRY_MODIFY
和 StandardWatchEventKinds.ENTRY_DELETE
。
如果您需要跟踪现有文件的修改以及新文件,您应该在 FileListFilter
中实现 ENTRY_MODIFY
事件逻辑。
否则,来自这些事件的文件将以相同的方式处理。
ResettableFileListFilter
实现会处理 ENTRY_DELETE
事件。
因此,它们的文件被提供给 remove()
操作。
当此事件启用时,像 AcceptOnceFileListFilter
这样的过滤器会将文件删除。
结果,如果出现同名文件,它将通过过滤器并作为消息发送。
为此,引入了 watch-events
属性(FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
)。
(WatchEventType
是 FileReadingMessageSource
中的一个公共内部枚举。)
通过此选项,我们可以对新文件使用一种下游流逻辑,对修改后的文件使用其他逻辑。
以下示例展示了如何在同一目录中为创建和修改事件配置不同的逻辑:
值得一提的是,ENTRY_DELETE
事件涉及被监视目录的子目录的重命名操作。
更具体地说,与旧目录名相关的 ENTRY_DELETE
事件先于通知新(重命名)目录的 ENTRY_CREATE
事件。
在某些操作系统(如 Windows)上,必须注册 ENTRY_DELETE
事件才能处理这种情况。
否则,在文件资源管理器中重命名被监视的子目录可能会导致在该子目录中无法检测到新文件。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
从版本 6.1 开始,FileReadingMessageSource
暴露了两个新的 WatchService
相关选项:
-
watchMaxDepth
-Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)
API 的参数; -
watchDirPredicate
- 一个Predicate<Path>
,用于测试扫描树中的目录是否应该被遍历并注册到WatchService
和配置的监视事件类型。
限制内存消耗
您可以使用 HeadDirectoryScanner
来限制内存中保留的文件数量。
这在扫描大型目录时很有用。
使用 XML 配置时,通过在入站通道适配器上设置 queue-size
属性来启用此功能。
在版本 4.2 之前,此设置与任何其他过滤器的使用不兼容。
任何其他过滤器(包括 prevent-duplicates="true"
)都会覆盖用于限制大小的过滤器。
|
使用 Java 配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 配置
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
“tail”文件
另一个流行的用例是从文件末尾(或尾部)获取“行”,并在添加新行时捕获它们。
提供了两种实现。
第一种是 OSDelegatingFileTailingMessageProducer
,它使用本地 tail
命令(在具有该命令的操作系统上)。
这通常是这些平台上最有效的实现。
对于没有 tail
命令的操作系统,第二种实现 ApacheCommonsFileTailingMessageProducer
使用 Apache commons-io
的 Tailer
类。
在这两种情况下,文件系统事件(例如文件不可用和其他事件)都通过正常的 Spring 事件发布机制作为 ApplicationEvent
实例发布。
此类事件的示例包括:
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
上例中所示的事件序列可能发生,例如,当文件轮换时。
从版本 5.0 开始,当 idleEventInterval
期间文件中没有数据时,会发出 FileTailingIdleEvent
。
以下示例展示了此类事件的样子:
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持 |
从这些端点发出的消息具有以下头:
-
FileHeaders.ORIGINAL_FILE
:File
对象 -
FileHeaders.FILENAME
:文件名 (File.getName()
)
在版本 5.0 之前, |
以下示例使用默认选项(“-F -n 0”,表示从当前末尾跟踪文件名)创建一个本地适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例使用“-F -n +0”选项(表示跟踪文件名,发出所有现有行)创建一个本地适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果 tail
命令失败(在某些平台上,文件丢失会导致 tail
失败,即使指定了 -F
),该命令将每 10 秒重试一次。
默认情况下,本地适配器从标准输出捕获并将其内容作为消息发送。
它们还从标准错误捕获以引发事件。
从版本 4.3.6 开始,您可以通过将 enable-status-reader
设置为 false
来丢弃标准错误事件,如下例所示:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval
设置为 5000
,这意味着如果五秒钟没有写入行,FileTailingIdleEvent
将每五秒触发一次:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
当您需要停止适配器时,这可能很有用。
以下示例创建了一个 Apache commons-io
Tailer
适配器,它每两秒检查文件中的新行,每十秒检查一次缺失文件的存在:
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" [id="CO1-1"]1
reopen="true" [id="CO1-2"]2
file-delay="10000"/>
<1> 文件从开头 (`end="false"`) 跟踪,而不是从末尾(这是默认值)。 <1> 文件为每个块重新打开(默认是保持文件打开)。
指定 delay
、end
或 reopen
属性会强制使用 Apache commons-io
适配器,并使 native-options
属性不可用。