文件拆分器
FileSplitter
在 4.1.2 版本中添加,其命名空间支持在 4.2 版本中添加。
FileSplitter
根据 BufferedReader.readLine()
将文本文件拆分为单独的行。
默认情况下,拆分器使用 Iterator
一次发射一行,就像从文件中读取一样。
将 iterator
属性设置为 false
会导致它在将所有行作为消息发出之前将它们读入内存。
这样做的用例之一是,您可能希望在发送任何包含行的消息之前检测文件上的 I/O 错误。
然而,它只适用于相对较短的文件。
入站负载可以是 File
、String
(一个 File
路径)、InputStream
或 Reader
。
其他负载类型保持不变地发出。
以下列表显示了配置 FileSplitter
的可能方式:
- Java DSL
-
@SpringBootApplication public class FileSplitterApplication { public static void main(String[] args) { new SpringApplicationBuilder(FileSplitterApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow fileSplitterFlow() { return IntegrationFlow .from(Files.inboundAdapter(tmpDir.getRoot()) .filter(new ChainFileListFilter<File>() .addFilter(new AcceptOnceFileListFilter<>()) .addFilter(new ExpressionFileListFilter<>( new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName())))))) .split(Files.splitter() .markers() .charset(StandardCharsets.US_ASCII) .firstLineAsHeader("fileHeader") .applySequence(true)) .channel(c -> c.queue("fileSplittingResultChannel")) .get(); } }
- Kotlin DSL
-
@Bean fun fileSplitterFlow() = integrationFlow( Files.inboundAdapter(tmpDir.getRoot()) .filter( ChainFileListFilter<File?>() .addFilter(AcceptOnceFileListFilter()) .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name })) ) ) { split( Files.splitter() .markers() .charset(StandardCharsets.US_ASCII) .firstLineAsHeader("fileHeader") .applySequence(true) ) channel { queue("fileSplittingResultChannel") } }
- Java
-
@Splitter(inputChannel="toSplitter") @Bean public MessageHandler fileSplitter() { FileSplitter splitter = new FileSplitter(true, true); splitter.setApplySequence(true); splitter.setOutputChannel(outputChannel); return splitter; }
- XML
-
<int-file:splitter id="splitter" [id="CO1-1"]1 iterator="" [id="CO1-2"]2 markers="" [id="CO1-3"]3 markers-json="" [id="CO1-4"]4 apply-sequence="" [id="CO1-5"]5 requires-reply="" [id="CO1-6"]6 charset="" [id="CO1-7"]7 first-line-as-header="" [id="CO1-8"]8 input-channel="" [id="CO1-9"]9 output-channel="" [id="CO1-10"]10 send-timeout="" [id="CO1-11"]11 auto-startup="" [id="CO1-12"]12 order="" [id="CO1-13"]13 phase="" /> [id="CO1-14"]14
<1> 拆分器的 bean 名称。 <1> 设置为
true
(默认值)以使用迭代器,或设置为false
以在发送行之前将文件加载到内存中。 <1> 设置为true
以在文件数据之前和之后发出文件开始和文件结束标记消息。 标记是带有FileSplitter.FileMarker
负载(mark
属性中带有START
和END
值)的消息。 当在下游流中顺序处理文件时,如果某些行被过滤,您可能会使用标记。 它们使下游处理能够知道文件何时已完全处理。 此外,一个包含START
或END
的file_marker
头被添加到这些消息中。END
标记包含行计数。 如果文件为空,则只发出START
和END
标记,lineCount
为0
。 默认值为false
。 当为true
时,apply-sequence
默认为false
。 另请参见markers-json
(下一个属性)。 <1> 当markers
为 true 时,将其设置为true
以使FileMarker
对象转换为 JSON 字符串。 (底层使用SimpleJsonSerializer
)。 <1> 设置为false
以禁用在消息中包含sequenceSize
和sequenceNumber
头。 默认值为true
,除非markers
为true
。 当为true
且markers
为true
时,标记包含在序列中。 当为true
且iterator
为true
时,sequenceSize
头设置为0
,因为大小未知。 <1> 设置为true
以在文件中没有行时抛出RequiresReplyException
。 默认值为false
。 <1> 设置读取文本数据到String
负载时使用的字符集名称。 默认值为平台字符集。 <1> 第一行作为头部的名称,该头部将随剩余行发出的消息一起携带。 自 5.0 版本起。 <1> 设置用于向拆分器发送消息的输入通道。 <1> 设置消息发送到的输出通道。 <1> 设置发送超时。 仅当output-channel
可以阻塞时才适用 — 例如一个已满的QueueChannel
。 <1> 设置为false
以在上下文刷新时禁用自动启动拆分器。 默认值为true
。 <1> 如果input-channel
是<publish-subscribe-channel/>
,则设置此端点的顺序。 <1> 设置拆分器的启动阶段(当auto-startup
为true
时使用)。FileSplitter
还会将任何基于文本的InputStream
拆分为行。 从 4.3 版本开始,当与 FTP 或 SFTP 流式入站通道适配器或使用stream
选项检索文件的 FTP 或 SFTP 出站网关结合使用时,当文件完全消耗后,拆分器会自动关闭支持流的会话。 有关这些功能的更多信息,请参见 FTP 流式入站通道适配器 和 SFTP 流式入站通道适配器 以及 FTP 出站网关 和 SFTP 出站网关。 当使用 Java 配置时,可以使用附加的构造函数,如以下示例所示:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
当 markersJson
为 true 时,标记表示为 JSON 字符串(使用 SimpleJsonSerializer
)。
版本 5.0 引入了 firstLineAsHeader
选项,用于指定内容的第一行是头部(例如 CSV 文件中的列名)。
传递给此属性的参数是头部名称,第一行将作为头部随剩余行发出的消息一起携带。
此行不包含在序列头部(如果 applySequence
为 true)中,也不包含在与 FileMarker.END
关联的 lineCount
中。
NOTE: 从 5.5 版本开始,lineCount
也作为 FileHeaders.LINE_COUNT
包含在 FileMarker.END
消息的头部中,因为 FileMarker
可以序列化为 JSON。
如果文件只包含头部行,则该文件被视为空文件,因此在拆分过程中只发出 FileMarker
实例(如果启用了标记 — 否则不发出任何消息)。
默认情况下(如果没有设置头部名称),第一行被视为数据,并成为第一个发出消息的负载。
如果您需要更复杂的关于文件内容中头部提取的逻辑(不是第一行,不是整行内容,不是某个特定头部等等),请考虑在 FileSplitter
之前使用 头部增强器。
请注意,已移动到头部的行可能会在下游从正常的内容处理中过滤掉。
== 拆分文件的幂等下游处理
当 apply-sequence
为 true 时,拆分器在 SEQUENCE_NUMBER
头中添加行号(当 markers
为 true 时,标记也被计为行)。
行号可以与 幂等接收器 一起使用,以避免在重新启动后重新处理行。
例如:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}