分裂器

分裂器是一个组件,其作用是将消息分成几个部分,并将生成的消息发送出去进行独立处理。它们通常是包含聚合器的管道中的上游生产者。

编程模型

执行分裂的 API 由一个基类 AbstractMessageSplitter 组成。它是一个 MessageHandler 实现,封装了分裂器共有的特性,例如在生成的消息中填充适当的消息头(CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER)。这种填充使得能够跟踪消息及其处理结果(在典型场景中,这些消息头会被复制到由各种转换端点生成的消息中)。这些值随后可以被一个链接:https://www.enterpriseintegrationpatterns.com/DistributionAggregate.html[复合消息处理器] 使用,例如。

以下示例显示了 AbstractMessageSplitter 的摘录:

public abstract class AbstractMessageSplitter
    extends AbstractReplyProducingMessageConsumer {
    ...
    protected abstract Object splitMessage(Message<?> message);

}

要在应用程序中实现特定的分裂器,您可以扩展 AbstractMessageSplitter 并实现 splitMessage 方法,其中包含分裂消息的逻辑。返回值可以是以下之一:

  • Collection 或消息数组,或遍历消息的 Iterable(或 Iterator)。在这种情况下,消息被作为消息发送(在填充 CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER 之后)。使用这种方法可以提供更多的控制——例如,在分裂过程中填充自定义消息头。

  • Collection 或非消息对象数组,或遍历非消息对象的 Iterable(或 Iterator)。它的工作方式与前一种情况类似,只是每个集合元素都被用作消息负载。使用这种方法可以让您专注于领域对象,而无需考虑消息系统,并生成更易于测试的代码。

  • Message 或非消息对象(但不是集合或数组)。它的工作方式与前几种情况类似,只是发送单个消息。

在 Spring Integration 中,任何 POJO 都可以实现分裂算法,只要它定义了一个接受单个参数并具有返回值的方法。在这种情况下,方法的返回值按前面所述进行解释。输入参数可以是 Message 或简单的 POJO。在后一种情况下,分裂器接收传入消息的负载。我们推荐这种方法,因为它将代码与 Spring Integration API 解耦,并且通常更容易测试。

迭代器

从版本 4.1 开始,AbstractMessageSplitter 支持 Iterator 类型用于要分裂的 value。请注意,在 Iterator(或 Iterable)的情况下,我们无法访问底层项目的数量,SEQUENCE_SIZE 消息头被设置为 0。这意味着 <aggregator> 的默认 SequenceSizeReleaseStrategy 将不起作用,并且来自 splitterCORRELATION_ID 的组将不会被释放;它将保持“不完整”状态。在这种情况下,您应该使用适当的自定义 ReleaseStrategy,或者依赖 send-partial-result-on-expiry 以及 group-timeoutMessageGroupStoreReaper

从版本 5.0 开始,AbstractMessageSplitter 提供了 protected obtainSizeIfPossible() 方法,如果可能的话,允许确定 IterableIterator 对象的大小。例如,XPathMessageSplitter 可以确定底层 NodeList 对象的大小。从版本 5.0.9 开始,此方法也正确返回 com.fasterxml.jackson.core.TreeNode 的大小。

Iterator 对象在分裂之前避免在内存中构建整个集合的需求时很有用。例如,当底层项目使用迭代或流从某些外部系统(例如数据库或 FTP MGET)填充时。

Stream 和 Flux

从版本 5.0 开始,AbstractMessageSplitter 支持 Java Stream 和 Reactive Streams Publisher 类型用于要分裂的 value。在这种情况下,目标 Iterator 是基于它们的迭代功能构建的。

此外,如果分裂器的输出通道是 ReactiveStreamsSubscribableChannel 的实例,AbstractMessageSplitter 将生成 Flux 结果而不是 Iterator,并且输出通道将订阅此 Flux,以根据下游流的需求进行基于背压的分裂。

从版本 5.2 开始,分裂器支持 discardChannel 选项,用于发送那些分裂函数返回空容器(集合、数组、流、Flux 等)的请求消息。在这种情况下,没有项目可迭代以发送到 outputChannelnull 分裂结果仍然是流结束的指示器。

使用 Java、Groovy 和 Kotlin DSL 配置分裂器

一个基于 Message 及其可迭代负载的简单分裂器示例,带 DSL 配置:

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

@Bean
public IntegrationFlow someFlow() {
    return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
    integrationFlow {
        split<Message<*>> { it.payload }
    }
@Bean
someFlow() {
    integrationFlow {
        splitWith {
		    expectedType Message<?>
		    function { it.payload }
        }
    }
}

有关 DSL 的更多信息,请参阅相应的章节:

使用 XML 配置分裂器

分裂器可以通过 XML 配置如下:

<int:channel id="inputChannel"/>

<int:splitter id="splitter"           [id="CO1-1"]1
  ref="splitterBean"                  [id="CO1-2"]2
  method="split"                      [id="CO1-3"]3
  input-channel="inputChannel"        [id="CO1-4"]4
  output-channel="outputChannel"      [id="CO1-5"]5
  discard-channel="discardChannel" /> [id="CO1-6"]6

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 分裂器的 ID 是可选的。
2 对应用程序上下文中定义的 bean 的引用。该 bean 必须实现分裂逻辑,如前一节所述。可选。如果未提供对 bean 的引用,则假定到达 input-channel 的消息的负载是 java.util.Collection 的实现,并且默认的分裂逻辑应用于该集合,将每个单独的元素合并到消息中并将其发送到 output-channel
3 实现分裂逻辑的方法(在 bean 上定义)。可选。
4 分裂器的输入通道。必需。
5 分裂器将传入消息的分裂结果发送到的通道。可选(因为传入消息可以自己指定回复通道)。
6 在分裂结果为空的情况下,请求消息发送到的通道。可选(它们将像 null 结果一样停止)。

如果自定义分裂器实现可以在其他 <splitter> 定义中引用,我们建议使用 ref 属性。但是,如果自定义分裂器处理程序实现应该限定为 <splitter> 的单个定义,您可以配置一个内部 bean 定义,如下例所示:

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</int:splitter>

不允许在同一个 <int:splitter> 配置中同时使用 ref 属性和内部处理程序定义,因为它会创建模糊条件并导致抛出异常。

如果 ref 属性引用扩展 AbstractMessageProducingHandler 的 bean(例如框架本身提供的分裂器),则通过将输出通道直接注入处理程序来优化配置。在这种情况下,每个 ref 都必须是一个单独的 bean 实例(或 prototype 作用域的 bean),或者使用内部 <bean/> 配置类型。但是,此优化仅在您未在分裂器 XML 定义中提供任何分裂器特定属性时才适用。如果您不小心从多个 bean 引用了相同的消息处理程序,您将收到配置异常。

使用注解配置分裂器

@Splitter 注解适用于期望 Message 类型或消息负载类型的方法,并且方法的返回值应该是一个任何类型的 Collection。如果返回的值不是实际的 Message 对象,则每个项目都将作为 Message 的负载包装在 Message 中。每个生成的消息都会发送到定义了 @Splitter 的端点的指定输出通道。

以下示例演示了如何使用 @Splitter 注解配置分裂器:

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}