分裂器
分裂器是一个组件,其作用是将消息分成几个部分,并将生成的消息发送出去进行独立处理。它们通常是包含聚合器的管道中的上游生产者。
编程模型
执行分裂的 API 由一个基类 AbstractMessageSplitter
组成。它是一个 MessageHandler
实现,封装了分裂器共有的特性,例如在生成的消息中填充适当的消息头(CORRELATION_ID
、SEQUENCE_SIZE
和 SEQUENCE_NUMBER
)。这种填充使得能够跟踪消息及其处理结果(在典型场景中,这些消息头会被复制到由各种转换端点生成的消息中)。这些值随后可以被一个链接:https://www.enterpriseintegrationpatterns.com/DistributionAggregate.html[复合消息处理器] 使用,例如。
以下示例显示了 AbstractMessageSplitter
的摘录:
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
要在应用程序中实现特定的分裂器,您可以扩展 AbstractMessageSplitter
并实现 splitMessage
方法,其中包含分裂消息的逻辑。返回值可以是以下之一:
-
Collection
或消息数组,或遍历消息的Iterable
(或Iterator
)。在这种情况下,消息被作为消息发送(在填充CORRELATION_ID
、SEQUENCE_SIZE
和SEQUENCE_NUMBER
之后)。使用这种方法可以提供更多的控制——例如,在分裂过程中填充自定义消息头。 -
Collection
或非消息对象数组,或遍历非消息对象的Iterable
(或Iterator
)。它的工作方式与前一种情况类似,只是每个集合元素都被用作消息负载。使用这种方法可以让您专注于领域对象,而无需考虑消息系统,并生成更易于测试的代码。 -
Message
或非消息对象(但不是集合或数组)。它的工作方式与前几种情况类似,只是发送单个消息。
在 Spring Integration 中,任何 POJO 都可以实现分裂算法,只要它定义了一个接受单个参数并具有返回值的方法。在这种情况下,方法的返回值按前面所述进行解释。输入参数可以是 Message
或简单的 POJO。在后一种情况下,分裂器接收传入消息的负载。我们推荐这种方法,因为它将代码与 Spring Integration API 解耦,并且通常更容易测试。
迭代器
从版本 4.1 开始,AbstractMessageSplitter
支持 Iterator
类型用于要分裂的 value
。请注意,在 Iterator
(或 Iterable
)的情况下,我们无法访问底层项目的数量,SEQUENCE_SIZE
消息头被设置为 0
。这意味着 <aggregator>
的默认 SequenceSizeReleaseStrategy
将不起作用,并且来自 splitter
的 CORRELATION_ID
的组将不会被释放;它将保持“不完整”状态。在这种情况下,您应该使用适当的自定义 ReleaseStrategy
,或者依赖 send-partial-result-on-expiry
以及 group-timeout
或 MessageGroupStoreReaper
。
从版本 5.0 开始,AbstractMessageSplitter
提供了 protected obtainSizeIfPossible()
方法,如果可能的话,允许确定 Iterable
和 Iterator
对象的大小。例如,XPathMessageSplitter
可以确定底层 NodeList
对象的大小。从版本 5.0.9 开始,此方法也正确返回 com.fasterxml.jackson.core.TreeNode
的大小。
Iterator
对象在分裂之前避免在内存中构建整个集合的需求时很有用。例如,当底层项目使用迭代或流从某些外部系统(例如数据库或 FTP MGET
)填充时。
Stream 和 Flux
从版本 5.0 开始,AbstractMessageSplitter
支持 Java Stream
和 Reactive Streams Publisher
类型用于要分裂的 value
。在这种情况下,目标 Iterator
是基于它们的迭代功能构建的。
此外,如果分裂器的输出通道是 ReactiveStreamsSubscribableChannel
的实例,AbstractMessageSplitter
将生成 Flux
结果而不是 Iterator
,并且输出通道将订阅此 Flux
,以根据下游流的需求进行基于背压的分裂。
从版本 5.2 开始,分裂器支持 discardChannel
选项,用于发送那些分裂函数返回空容器(集合、数组、流、Flux
等)的请求消息。在这种情况下,没有项目可迭代以发送到 outputChannel
。null
分裂结果仍然是流结束的指示器。
使用 Java、Groovy 和 Kotlin DSL 配置分裂器
一个基于 Message
及其可迭代负载的简单分裂器示例,带 DSL 配置:
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
integrationFlow {
split<Message<*>> { it.payload }
}
@Bean
someFlow() {
integrationFlow {
splitWith {
expectedType Message<?>
function { it.payload }
}
}
}
有关 DSL 的更多信息,请参阅相应的章节:
使用 XML 配置分裂器
分裂器可以通过 XML 配置如下:
<int:channel id="inputChannel"/>
<int:splitter id="splitter" [id="CO1-1"]1
ref="splitterBean" [id="CO1-2"]2
method="split" [id="CO1-3"]3
input-channel="inputChannel" [id="CO1-4"]4
output-channel="outputChannel" [id="CO1-5"]5
discard-channel="discardChannel" /> [id="CO1-6"]6
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 | 分裂器的 ID 是可选的。 |
2 | 对应用程序上下文中定义的 bean 的引用。该 bean 必须实现分裂逻辑,如前一节所述。可选。如果未提供对 bean 的引用,则假定到达 input-channel 的消息的负载是 java.util.Collection 的实现,并且默认的分裂逻辑应用于该集合,将每个单独的元素合并到消息中并将其发送到 output-channel 。 |
3 | 实现分裂逻辑的方法(在 bean 上定义)。可选。 |
4 | 分裂器的输入通道。必需。 |
5 | 分裂器将传入消息的分裂结果发送到的通道。可选(因为传入消息可以自己指定回复通道)。 |
6 | 在分裂结果为空的情况下,请求消息发送到的通道。可选(它们将像 null 结果一样停止)。 |
如果自定义分裂器实现可以在其他 <splitter>
定义中引用,我们建议使用 ref
属性。但是,如果自定义分裂器处理程序实现应该限定为 <splitter>
的单个定义,您可以配置一个内部 bean 定义,如下例所示:
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
不允许在同一个 |
如果 ref
属性引用扩展 AbstractMessageProducingHandler
的 bean(例如框架本身提供的分裂器),则通过将输出通道直接注入处理程序来优化配置。在这种情况下,每个 ref
都必须是一个单独的 bean 实例(或 prototype
作用域的 bean),或者使用内部 <bean/>
配置类型。但是,此优化仅在您未在分裂器 XML 定义中提供任何分裂器特定属性时才适用。如果您不小心从多个 bean 引用了相同的消息处理程序,您将收到配置异常。