File Aggregator
从版本 5.5 开始,引入了“FileAggregator
”来涵盖在启用 START/END 标记时“FileSplitter
”使用案例的另一面。为了方便,“FileAggregator
”实现了所有三个序列详细信息策略:
Starting with version 5.5, a FileAggregator
is introduced to cover other side of FileSplitter
use-case when START/END markers are enabled.
For convenience the FileAggregator
implements all three sequence details strategies:
-
具有 `FileHeaders.FILENAME`属性的 `HeaderAttributeCorrelationStrategy`用于关联键计算。当在 `FileSplitter`上启用标记时,它不会填充序列详情标头,因为开始/结束标记消息也包含在序列大小中。`FileHeaders.FILENAME`仍会为发出的每一行(包括开始/结束标记消息)填充。
-
The
HeaderAttributeCorrelationStrategy
with theFileHeaders.FILENAME
attribute is used for correlation key calculation. When markers are enabled on theFileSplitter
, it does not populate sequence details headers, since START/END marker messages are also included into the sequence size. TheFileHeaders.FILENAME
is still populated for each line emitted, including START/END marker messages. -
FileMarkerReleaseStrategy
- 检查组中的FileSplitter.FileMarker.Mark.END`消息,然后将 `FileHeaders.LINE_COUNT`标头值与组大小减去 `2
- `FileSplitter.FileMarker`实例进行比较。它还实现在 `GroupConditionProvider`中使用的 `AbstractCorrelatingMessageHandler`的一个便利 `conditionSupplier`联系人。有关更多信息,请参见 Message Group Condition。 -
The
FileMarkerReleaseStrategy
- checks forFileSplitter.FileMarker.Mark.END
message in the group and then compare aFileHeaders.LINE_COUNT
header value with the group size minus2
-FileSplitter.FileMarker
instances. It also implements a convenientGroupConditionProvider
contact forconditionSupplier
function to be used in theAbstractCorrelatingMessageHandler
. See Message Group Condition for more information. -
`FileAggregatingMessageGroupProcessor`只从组中移除 `FileSplitter.FileMarker`消息并收集其余消息到列表有效负载中以进行产生。
-
The
FileAggregatingMessageGroupProcessor
just removesFileSplitter.FileMarker
messages from the group and collect the rest of messages into a list payload to produce.
以下清单展示了配置“FileAggregator
”的可能方法:
The following listing shows possible ways to configure a FileAggregator
:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
return f -> f
.split(Files.splitter()
.markers()
.firstLineAsHeader("firstLine"))
.channel(c -> c.executor(taskExecutor))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker),
e -> e.discardChannel("aggregatorChannel"))
.<String, String>transform(String::toUpperCase)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.channel(c -> c.queue("resultChannel"));
}
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
integrationFlow {
split(Files.splitter().markers().firstLineAsHeader("firstLine"))
channel { executor(taskExecutor) }
filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
transform(String::toUpperCase)
channel("aggregatorChannel")
aggregate(FileAggregator())
channel { queue("resultChannel") }
}
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setProcessorBean(new FileAggregator());
aggregator.setOutputChannel(outputChannel);
return aggregator;
}
<int:chain input-channel="input" output-channel="output">
<int-file:splitter markers="true"/>
<int:aggregator>
<bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
</int:aggregator>
</int:chain>
如果“FileAggregator
”的默认行为不能满足目标逻辑,建议使用各个策略配置聚合器端点。有关详细信息,请参阅“FileAggregator
”JavaDocs。
If default behavior of the FileAggregator
does not satisfy the target logic, it is recommended to configure an aggregator endpoint with individual strategies.
See FileAggregator
JavaDocs for more information.