聚合器

聚合器基本上是拆分器的镜像,它是一种消息处理器,接收多条消息并将它们组合成一条消息。 事实上,聚合器通常是包含拆分器的管道中的下游消费者。 从技术上讲,聚合器比拆分器更复杂,因为它是有状态的。 它必须持有要聚合的消息,并确定何时准备好聚合完整的消息组。 为此,它需要一个 MessageStore

功能

聚合器通过关联和存储一组相关消息,直到该组被认为是完整的,从而将它们组合起来。 此时,聚合器通过处理整个组来创建单个消息,并将聚合后的消息作为输出发送。

实现聚合器需要提供执行聚合的逻辑(即从多条消息创建单条消息)。 两个相关的概念是关联和释放。

关联决定了消息如何分组以进行聚合。 在 Spring Integration 中,默认情况下,关联是基于 IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头完成的。 具有相同 IntegrationMessageHeaderAccessor.CORRELATION_ID 的消息被分组在一起。 但是,您可以自定义关联策略,以允许其他方式指定消息应如何分组。 为此,您可以实现一个 CorrelationStrategy(本章稍后介绍)。

为了确定消息组何时准备好进行处理,需要咨询 ReleaseStrategy。 聚合器的默认释放策略是当序列中包含的所有消息都存在时(基于 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头),释放一个组。 您可以通过提供对自定义 ReleaseStrategy 实现的引用来覆盖此默认策略。

编程模型

聚合 API 由许多类组成:

  • 接口 MessageGroupProcessor 及其子类:MethodInvokingAggregatingMessageGroupProcessorExpressionEvaluatingMessageGroupProcessor

  • ReleaseStrategy 接口及其默认实现:SimpleSequenceSizeReleaseStrategy

  • CorrelationStrategy 接口及其默认实现:HeaderAttributeCorrelationStrategy

AggregatingMessageHandler

AggregatingMessageHandlerAbstractCorrelatingMessageHandler 的子类)是一个 MessageHandler 实现,封装了聚合器(以及其他关联用例)的常见功能,如下所示:

  • 将消息关联到要聚合的组中

  • MessageStore 中维护这些消息,直到该组可以释放

  • 决定何时可以释放该组

  • 将已释放的组聚合为单个消息

  • 识别并响应已过期的组

决定消息应如何分组的责任委托给 CorrelationStrategy 实例。 决定消息组是否可以释放的责任委托给 ReleaseStrategy 实例。

以下清单显示了基类 AbstractAggregatingMessageGroupProcessor 的简要亮点(实现 aggregatePayloads 方法的责任留给开发人员):

public abstract class AbstractAggregatingMessageGroupProcessor
              implements MessageGroupProcessor {

    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        // default implementation exists
    }

    protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);

}

请参阅 DefaultAggregatingMessageGroupProcessorExpressionEvaluatingMessageGroupProcessorMethodInvokingMessageGroupProcessor 作为 AbstractAggregatingMessageGroupProcessor 的开箱即用实现。

从版本 5.2 开始,AbstractAggregatingMessageGroupProcessor 提供了一个 Function<MessageGroup, Map<String, Object>> 策略,用于合并和计算(聚合)输出消息的头。 DefaultAggregateHeadersFunction 实现提供了这样的逻辑:返回组中所有没有冲突的头;组中一个或多个消息上不存在的头不被视为冲突。 冲突的头将被省略。 与新引入的 DelegatingMessageGroupProcessor 一起,此函数用于任何任意(非 AbstractAggregatingMessageGroupProcessorMessageGroupProcessor 实现。 本质上,框架将提供的函数注入到 AbstractAggregatingMessageGroupProcessor 实例中,并将所有其他实现包装到 DelegatingMessageGroupProcessor 中。 AbstractAggregatingMessageGroupProcessorDelegatingMessageGroupProcessor 之间的逻辑差异在于,后者在调用委托策略之前不预先计算头,并且如果委托返回 MessageAbstractIntegrationMessageBuilder,则不调用该函数。 在这种情况下,框架假定目标实现已负责生成填充到返回结果中的一组适当的头。 Function<MessageGroup, Map<String, Object>> 策略在 XML 配置中作为 headers-function 引用属性可用,在 Java DSL 中作为 AggregatorSpec.headersFunction() 选项可用,在纯 Java 配置中作为 AggregatorFactoryBean.setHeadersFunction() 可用。

CorrelationStrategyAbstractCorrelatingMessageHandler 拥有,并具有基于 IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头的默认值,如以下示例所示:

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
        CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
    ...
    this.correlationStrategy = correlationStrategy == null ?
        new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
    this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
    ...
}

至于消息组的实际处理,默认实现是 DefaultAggregatingMessageGroupProcessor。 它创建一个 Message,其负载是给定组收到的负载的 List。 这对于上游包含拆分器、发布/订阅通道或收件人列表路由器的简单分散/聚合实现非常有效。

在此类场景中使用发布/订阅通道或收件人列表路由器时,请务必启用 apply-sequence 标志。 这样做会添加必要的头:CORRELATION_IDSEQUENCE_NUMBERSEQUENCE_SIZE。 对于 Spring Integration 中的拆分器,此行为默认启用,但对于发布/订阅通道或收件人列表路由器,则不启用,因为这些组件可能在各种上下文中用于不需要这些头的情况。

当为应用程序实现特定的聚合器策略时,您可以扩展 AbstractAggregatingMessageGroupProcessor 并实现 aggregatePayloads 方法。 但是,还有更好的解决方案,与 API 的耦合度更低,用于实现聚合逻辑,可以通过 XML 或注解进行配置。

通常,任何 POJO 都可以实现聚合算法,如果它提供一个接受单个 java.util.List 作为参数的方法(也支持参数化列表)。 此方法用于聚合消息,如下所示:

  • 如果参数是 java.util.Collection<T> 并且参数类型 T 可分配给 Message,则发送累积用于聚合的整个消息列表到聚合器。

  • 如果参数是非参数化的 java.util.Collection 或参数类型不可分配给 Message,则该方法接收累积消息的有效负载。

  • 如果返回类型不可分配给 Message,则将其视为由框架自动创建的 Message 的有效负载。

为了简化代码并推广低耦合、可测试性等最佳实践,实现聚合逻辑的首选方式是通过 POJO 并使用 XML 或注解支持在应用程序中配置它。

从版本 5.3 开始,处理消息组后,AbstractCorrelatingMessageHandlerMessageBuilder.popSequenceDetails() 消息头进行修改,以实现具有多个嵌套级别的正确拆分器-聚合器场景。 只有当消息组释放结果不是消息集合时才执行此操作。 在这种情况下,目标 MessageGroupProcessor 负责在构建这些消息时调用 MessageBuilder.popSequenceDetails()

如果 MessageGroupProcessor 返回 Message,则只有当 sequenceDetails 与组中的第一条消息匹配时,才会在输出消息上执行 MessageBuilder.popSequenceDetails()。 (以前,只有当 MessageGroupProcessor 返回纯有效负载或 AbstractIntegrationMessageBuilder 时才执行此操作。)

此功能可以通过新的 popSequence boolean 属性进行控制,因此在某些情况下,当相关性详细信息未由标准拆分器填充时,可以禁用 MessageBuilder.popSequenceDetails()。 此属性本质上撤消了 AbstractMessageSplitter 中最近的上游 applySequence = true 所做的事情。 有关更多信息,请参阅 拆分器

SimpleMessageGroup.getMessages() 方法返回一个 unmodifiableCollection。 因此,如果聚合 POJO 方法具有 Collection<Message> 参数,则传入的参数就是该 Collection 实例,并且当您将 SimpleMessageStore 用于聚合器时,该原始 Collection<Message> 在释放组后被清除。 因此,如果 Collection<Message> 变量从聚合器中传出,它也会被清除。 如果您希望简单地按原样释放该集合以进行进一步处理,则必须构建一个新的 Collection(例如,new ArrayList<Message>(messages))。 从版本 4.3 开始,框架不再将消息复制到新集合,以避免不必要的额外对象创建。

在 4.2 版本之前,无法通过使用 XML 配置提供 MessageGroupProcessor。 只能使用 POJO 方法进行聚合。 现在,如果框架检测到引用的(或内部)bean 实现了 MessageProcessor,则将其用作聚合器的输出处理器。

如果您希望从自定义 MessageGroupProcessor 释放对象集合作为消息的有效负载,您的类应该扩展 AbstractAggregatingMessageGroupProcessor 并实现 aggregatePayloads()

此外,从 4.2 版本开始,提供了 SimpleMessageGroupProcessor。 它返回组中的消息集合,如前所述,这会导致释放的消息单独发送。

这使得聚合器可以充当消息屏障,其中到达的消息被保留,直到释放策略触发并将组作为单个消息序列释放。

从 6.0 版本开始,上述拆分行为仅在组处理器是 SimpleMessageGroupProcessor 时才有效。 否则,对于任何其他返回 Collection<Message>MessageGroupProcessor 实现,只会发出一条回复消息,其有效负载是整个消息集合。 这种逻辑由聚合器的规范目的决定——通过某个键收集请求消息并生成单个分组消息。

在 6.5 版本之前,如果 MessageGroupProcessor(通常是 DSL 中的 lambda)返回有效负载集合,AbstractCorrelatingMessageHandler 会因 IllegalArgumentException 而失败,指出只可能返回消息集合。 从现在开始,这种限制已被消除,返回的有效负载集合将作为聚合器发出的单个回复消息,其中只包含最后一条请求消息的头。 如果需要头聚合以及有效负载集合,建议使用 AbstractAggregatingMessageGroupProcessor 实现而不是普通的 MessageGroupProcessor 函数接口。

ReleaseStrategy

ReleaseStrategy 接口定义如下:

public interface ReleaseStrategy {

  boolean canRelease(MessageGroup group);

}

通常,任何 POJO 都可以实现完成决策逻辑,如果它提供一个接受单个 java.util.List 作为参数(也支持参数化列表)并返回布尔值的方法。 此方法在每条新消息到达后调用,以决定组是否完成,如下所示:

  • 如果参数是 java.util.List<T> 并且参数类型 T 可分配给 Message,则发送累积在组中的整个消息列表到该方法。

  • 如果参数是非参数化的 java.util.List 或参数类型不可分配给 Message,则该方法接收累积消息的有效负载。

  • 如果消息组已准备好进行聚合,则该方法必须返回 true,否则返回 false

以下示例演示了如何将 @ReleaseStrategy 注解用于 Message 类型的 List

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<Message<?>>) {...}
}

以下示例演示了如何将 @ReleaseStrategy 注解用于 String 类型的 List

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<String>) {...}
}

根据前面两个示例中的签名,基于 POJO 的释放策略会传递一个尚未释放的消息 Collection(如果您需要访问完整的 Message)或一个有效负载对象 Collection(如果类型参数不是 Message)。 这满足了大多数用例。 但是,如果由于某种原因需要访问完整的 MessageGroup,则应提供 ReleaseStrategy 接口的实现。

处理潜在的大型组时,您应该了解这些方法的调用方式,因为释放策略可能会在组释放之前多次调用。 最有效的是 ReleaseStrategy 的实现,因为聚合器可以直接调用它。 其次有效的是具有 Collection<Message<?>> 参数类型的 POJO 方法。 效率最低的是具有 Collection<Something> 类型的 POJO 方法。 框架必须每次调用释放策略时,将有效负载从组中的消息复制到新集合中(并可能尝试将有效负载转换为 Something)。 使用 Collection<?> 避免了转换,但仍然需要创建新的 Collection。 出于这些原因,对于大型组,我们建议您实现 ReleaseStrategy

当组被释放以进行聚合时,所有尚未释放的消息都将被处理并从组中删除。 如果组也已完成(即,如果序列中的所有消息都已到达或未定义序列),则该组被标记为完成。 此组的任何新消息都将发送到丢弃通道(如果已定义)。 将 expire-groups-upon-completion 设置为 true(默认值为 false)会删除整个组,并且任何新消息(具有与已删除组相同的关联 ID)都会形成一个新组。 您可以使用 MessageGroupStoreReaper 并将 send-partial-result-on-expiry 设置为 true 来释放部分序列。

从 6.5 版本开始,关联处理器还可以配置 discardIndividuallyOnExpiry 选项,以将整个组作为单个消息丢弃。 本质上,此消息的有效负载是过期组中的消息列表。 仅当 sendPartialResultOnExpiry 设置为 false(默认值)且提供了 dicardChannel 时才有效。

为了方便丢弃迟到的消息,聚合器必须在消息组释放后维护其状态。 这最终可能导致内存不足。 为了避免这种情况,您应该考虑配置 MessageGroupStoreReaper 来删除组元数据。 过期参数应设置为在达到某个点后(不再期望迟到的消息到达)使组过期。 有关配置收割者的信息,请参阅 reaper

Spring Integration 提供了 ReleaseStrategy 的实现:SimpleSequenceSizeReleaseStrategy。 此实现会检查每条到达消息的 SEQUENCE_NUMBERSEQUENCE_SIZE 头,以决定何时完成消息组并准备好进行聚合。 如前所示,它也是默认策略。

在 5.0 版本之前,默认的释放策略是 SequenceSizeReleaseStrategy,它在大型组中表现不佳。 使用该策略,会检测并拒绝重复的序列号。 此操作可能代价高昂。

如果您正在聚合大型组,不需要释放部分组,也不需要检测/拒绝重复序列,请考虑使用 SimpleSequenceSizeReleaseStrategy —— 它对于这些用例效率更高,并且自 5.0 版本 以来在未指定部分组释放时是默认值。

聚合大型组

4.3 版本将 SimpleMessageGroup 中消息的默认 Collection 更改为 HashSet(以前是 BlockingQueue)。 从大型组中删除单个消息时,这代价高昂(需要 O(n) 线性扫描)。 尽管哈希集通常删除速度快得多,但对于大型消息可能代价高昂,因为必须在插入和删除时计算哈希。 如果您的消息哈希成本很高,请考虑使用其他集合类型。 如 使用 MessageGroupFactory 中所述,提供了 SimpleMessageGroupFactory,以便您可以选择最适合您需求的 Collection。 您还可以提供自己的工厂实现来创建其他 Collection<Message<?>>

以下示例演示了如何使用先前的实现和 SimpleSequenceSizeReleaseStrategy 配置聚合器:

<int:aggregator input-channel="aggregate"
    output-channel="out" message-store="store" release-strategy="releaser" />

<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
    <property name="messageGroupFactory">
        <bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
            <constructor-arg value="BLOCKING_QUEUE"/>
        </bean>
    </property>
</bean>

<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />

如果筛选器端点参与聚合器上游的流,则序列大小释放策略(固定或基于 sequenceSize 头)将无法实现其目的,因为序列中的某些消息可能会被筛选器丢弃。 在这种情况下,建议选择另一个 ReleaseStrategy,或者使用从丢弃子流发送的补偿消息,其内容中包含一些信息,以便在自定义完整组函数中跳过。 有关更多信息,请参阅 筛选器

关联策略

CorrelationStrategy 接口定义如下:

public interface CorrelationStrategy {

  Object getCorrelationKey(Message<?> message);

}

该方法返回一个 Object,表示用于将消息与消息组关联的关联键。 该键必须满足 Map 中键的条件,即 equals()hashCode() 的实现。

通常,任何 POJO 都可以实现关联逻辑,并且将消息映射到方法参数的规则与 ServiceActivator 相同(包括对 @Header 注解的支持)。 该方法必须返回一个值,并且该值不能为 null

Spring Integration 提供了 CorrelationStrategy 的实现:HeaderAttributeCorrelationStrategy。 此实现返回消息头之一的值(其名称由构造函数参数指定)作为关联键。 默认情况下,关联策略是一个 HeaderAttributeCorrelationStrategy,它返回 CORRELATION_ID 头属性的值。 如果您有要用于关联的自定义头名称,则可以在 HeaderAttributeCorrelationStrategy 实例上配置它,并将其作为聚合器关联策略的引用。

锁注册表

对组的更改是线程安全的。 因此,当您并发发送相同关联 ID 的消息时,聚合器中只会处理其中一条消息,使其有效地成为 每个消息组单线程LockRegistry 用于获取解析的关联 ID 的锁。 默认情况下使用 DefaultLockRegistry(内存中)。 对于跨服务器同步更新(在使用共享 MessageGroupStore 的情况下),您必须配置一个共享锁注册表。

避免死锁

如上所述,当消息组被修改(添加或释放消息)时,会持有锁。

考虑以下流程:

...->aggregator1-> ... ->aggregator2-> ...

如果存在多个线程,并且聚合器共享一个公共锁注册表,则可能会发生死锁。 这将导致线程挂起,jstack <pid> 可能会显示如下结果:

Found one Java-level deadlock:
=============================
"t2":
  waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t1"
"t1":
  waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t2"

有几种方法可以避免此问题:

  • 确保每个聚合器都有自己的锁注册表(这可以是跨应用程序实例的共享注册表,但流中的两个或更多聚合器必须各自具有不同的注册表)

  • 使用 ExecutorChannelQueueChannel 作为聚合器的输出通道,以便下游流在新线程上运行

  • 从版本 5.1.1 开始,将 releaseLockBeforeSend 聚合器属性设置为 true

如果由于某种原因,单个聚合器的输出最终被路由回同一个聚合器,也会导致此问题。 当然,上述第一个解决方案在这种情况下不适用。

在 Java DSL 中配置聚合器

有关如何在 Java DSL 中配置聚合器,请参阅 聚合器和重排序器

使用 XML 配置聚合器

Spring Integration 通过 <aggregator/> 元素支持聚合器的 XML 配置。 以下示例展示了一个聚合器示例:

<channel id="inputChannel"/>

<int:aggregator id="myAggregator"                          [id="CO1-1"]1
        auto-startup="true"                                [id="CO1-2"]2
        input-channel="inputChannel"                       [id="CO1-3"]3
        output-channel="outputChannel"                     [id="CO1-4"]4
        discard-channel="throwAwayChannel"                 [id="CO1-5"]5
        message-store="persistentMessageStore"             [id="CO1-6"]6
        order="1"                                          [id="CO1-7"]7
        send-partial-result-on-expiry="false"              [id="CO1-8"]8
        send-timeout="1000"                                [id="CO1-9"]9

        correlation-strategy="correlationStrategyBean"     [id="CO1-10"]10
        correlation-strategy-method="correlate"            [id="CO1-11"]11
        correlation-strategy-expression="headers['foo']"   [id="CO1-12"]12

        ref="aggregatorBean"                               [id="CO1-13"]13
        method="aggregate"                                 [id="CO1-14"]14

        release-strategy="releaseStrategyBean"             [id="CO1-15"]15
        release-strategy-method="release"                  [id="CO1-16"]16
        release-strategy-expression="size() == 5"          [id="CO1-17"]17

        expire-groups-upon-completion="false"              [id="CO1-18"]18
        empty-group-min-timeout="60000"                    [id="CO1-19"]19

        lock-registry="lockRegistry"                       [id="CO1-20"]20

        group-timeout="60000"                              [id="CO1-21"]21
        group-timeout-expression="size() ge 2 ? 100 : -1"  [id="CO1-22"]22
        expire-groups-upon-timeout="true"                  [id="CO1-23"]23

        scheduler="taskScheduler" >                        [id="CO1-24"]24
            <expire-transactional/>                        [id="CO1-25"]25
            <expire-advice-chain/>                         [id="CO1-26"]26
</aggregator>

<int:channel id="outputChannel"/>

<int:channel id="throwAwayChannel"/>

<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
    <constructor-arg ref="dataSource"/>
</bean>

<bean id="aggregatorBean" class="sample.PojoAggregator"/>

<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>

<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 聚合器的 ID 是可选的。
2 生命周期属性,指示聚合器是否应在应用程序上下文启动期间启动。可选(默认值为 'true')。
3 聚合器接收消息的通道。必需。
4 聚合器将聚合结果发送到的通道。可选(因为传入消息本身可以在 'replyChannel' 消息头中指定回复通道)。
5 聚合器将超时消息发送到的通道(如果 send-partial-result-on-expiryfalse)。可选。
6 MessageGroupStore 的引用,用于在其关联键下存储消息组,直到它们完成。可选。默认情况下,它是一个易失性内存存储。有关更多信息,请参阅 消息存储
7 当多个处理器订阅同一个 DirectChannel 时(用于负载均衡目的),此聚合器的顺序。可选。
8 指示过期消息应在包含它们的 MessageGroup 过期后进行聚合并发送到 'output-channel' 或 'replyChannel'(请参阅 MessageGroupStore.expireMessageGroups(long))。使 MessageGroup 过期的一种方法是配置 MessageGroupStoreReaper。但是,您也可以通过调用 MessageGroupStore.expireMessageGroups(timeout) 来使 MessageGroup 过期。您可以通过控制总线操作或如果您引用了 MessageGroupStore 实例,通过调用 expireMessageGroups(timeout) 来实现。否则,此属性本身什么也不做。它仅作为指示,表示是否丢弃或发送到输出或回复通道中 MessageGroup 中即将过期的任何消息。可选(默认值为 false)。注意:此属性可能更恰当地称为 send-partial-result-on-timeout,因为如果 expire-groups-upon-timeout 设置为 false,则组可能实际上不会过期。
9 将回复 Message 发送到 output-channeldiscard-channel 时等待的超时间隔。默认为 30 秒。仅当输出通道有一些“发送”限制时才适用,例如具有固定“容量”的 QueueChannel。在这种情况下,会抛出 MessageDeliveryException。对于 AbstractSubscribableChannel 实现,send-timeout 被忽略。对于 group-timeout(-expression),来自计划过期任务的 MessageDeliveryException 会导致此任务重新计划。可选。
10 对实现消息关联(分组)算法的 bean 的引用。该 bean 可以是 CorrelationStrategy 接口的实现或 POJO。在后一种情况下,还必须定义 correlation-strategy-method 属性。可选(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 头)。
11 correlation-strategy 引用的 bean 上定义的方法。它实现关联决策算法。可选,有限制(correlation-strategy 必须存在)。
12 表示关联策略的 SpEL 表达式。示例:"headers['something']"。只允许 correlation-strategycorrelation-strategy-expression 中的一个。
13 对应用程序上下文中定义的 bean 的引用。该 bean 必须实现聚合逻辑,如前所述。可选(默认情况下,聚合消息列表成为输出消息的有效负载)。
14 ref 属性引用的 bean 上定义的方法。它实现消息聚合算法。可选(取决于 ref 属性是否定义)。
15 对实现释放策略的 bean 的引用。该 bean 可以是 ReleaseStrategy 接口的实现或 POJO。在后一种情况下,还必须定义 release-strategy-method 属性。可选(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头属性)。
16 release-strategy 属性引用的 bean 上定义的方法。它实现完成决策算法。可选,有限制(release-strategy 必须存在)。
17 表示释放策略的 SpEL 表达式。表达式的根对象是 MessageGroup。示例:"size() == 5"。只允许 release-strategyrelease-strategy-expression 中的一个。
18 当设置为 true(默认值为 false)时,已完成的组将从消息存储中删除,允许具有相同关联的后续消息形成新组。默认行为是将与已完成组具有相同关联的消息发送到 discard-channel
19 仅当为 <aggregator>MessageStore 配置了 MessageGroupStoreReaper 时才适用。默认情况下,当配置 MessageGroupStoreReaper 以使部分组过期时,空组也会被删除。空组在组正常释放后存在。空组可以检测和丢弃迟到的消息。如果您希望空组的过期时间比部分组的过期时间长,请设置此属性。然后,空组不会从 MessageStore 中删除,直到它们在至少此毫秒数内未被修改。请注意,空组的实际过期时间也受收割者的 timeout 属性影响,可能高达此值加上超时。
20 org.springframework.integration.util.LockRegistry bean 的引用。它用于根据 groupId 获取 Lock 以进行 MessageGroup 上的并发操作。默认情况下,使用内部 DefaultLockRegistry。使用分布式 LockRegistry,例如 ZookeeperLockRegistry,可确保只有一个聚合器实例可以并发操作一个组。有关更多信息,请参阅 Redis 锁注册表Zookeeper 锁注册表
21 一个超时(以毫秒为单位),用于在当前消息到达时 ReleaseStrategy 不释放组时强制 MessageGroup 完成。此属性为聚合器提供了一个内置的基于时间的释放策略,当需要发出部分结果(或丢弃组)时,如果新消息在超时时间内没有到达 MessageGroup,则从最后一条消息到达的时间算起。要设置从 MessageGroup 创建时间算起的超时,请参阅 group-timeout-expression 信息。当新消息到达聚合器时,将取消其 MessageGroup 的任何现有 ScheduledFuture<?>。如果 ReleaseStrategy 返回 false(表示不释放)且 groupTimeout > 0,则会安排一个新任务以使组过期。我们不建议将此属性设置为零(或负值)。这样做会有效地禁用聚合器,因为每个消息组都会立即完成。但是,您可以使用表达式有条件地将其设置为零(或负值)。有关信息,请参阅 group-timeout-expression。完成期间采取的操作取决于 ReleaseStrategysend-partial-group-on-expiry 属性。有关更多信息,请参阅 agg-and-group-to。它与 'group-timeout-expression' 属性互斥。
22 SpEL 表达式,评估为 groupTimeout,其中 MessageGroup 作为 #root 评估上下文对象。用于安排 MessageGroup 被强制完成。如果表达式评估为 null,则不安排完成。如果评估为零,则组立即在当前线程上完成。实际上,这提供了一个动态的 group-timeout 属性。例如,如果您希望在组创建后 10 秒强制完成 MessageGroup,您可能需要考虑使用以下 SpEL 表达式:timestamp + 10000 - T(System).currentTimeMillis(),其中 timestampMessageGroup.getTimestamp() 提供,因为 MessageGroup#root 评估上下文对象。但是,请记住,组创建时间可能与第一条到达消息的时间不同,具体取决于其他组过期属性的配置。有关更多信息,请参阅 group-timeout。与 'group-timeout' 属性互斥。
23 当组因超时(或由 MessageGroupStoreReaper)而完成时,默认情况下组会过期(完全删除)。迟到的消息会启动一个新组。您可以将其设置为 false 以完成组,但保留其元数据,以便丢弃迟到的消息。空组以后可以使用 MessageGroupStoreReaperempty-group-min-timeout 属性过期。它默认为 'true'。
24 一个 TaskScheduler bean 引用,用于在 groupTimeout 内没有新消息到达 MessageGroup 时,计划 MessageGroup 被强制完成。如果未提供,则使用 ApplicationContext 中注册的默认调度程序(taskScheduler)(ThreadPoolTaskScheduler)。如果未指定 group-timeoutgroup-timeout-expression,则此属性不适用。
25 从版本 4.1 开始。它允许为 forceComplete 操作启动事务。它由 group-timeout(-expression)MessageGroupStoreReaper 启动,不适用于正常的 addreleasediscard 操作。只允许此子元素或 <expire-advice-chain/>
26 4.1 版本 开始。它允许为 forceComplete 操作配置任何 Advice。它由 group-timeout(-expression)MessageGroupStoreReaper 启动,不适用于正常的 addreleasediscard 操作。只允许此子元素或 <expire-transactional/>。也可以使用 Spring tx 命名空间在此处配置事务 Advice
Example 1. 使组过期

有两个与使组过期(完全删除)相关的属性。 当一个组过期时,没有它的记录,如果一条新消息以相同的关联到达,则会启动一个新组。 当一个组完成(未过期)时,空组仍然存在,并且迟到的消息将被丢弃。 空组以后可以使用 MessageGroupStoreReaper 结合 empty-group-min-timeout 属性删除。 expire-groups-upon-completionReleaseStrategy 释放组时的“正常”完成有关。 这默认为 false。 如果一个组没有正常完成,而是由于超时而释放或丢弃,则该组通常会过期。 从版本 4.1 开始,您可以使用 expire-groups-upon-timeout 控制此行为。 为了向后兼容,它默认为 true

当一个组超时时,ReleaseStrategy 会再获得一次机会来释放该组。 如果它这样做并且 expire-groups-upon-timeout 为 false,则过期由 expire-groups-upon-completion 控制。 如果释放策略在超时期间仍未释放该组,则该组将过期。 超时组要么被丢弃,要么发生部分释放(基于 send-partial-result-on-expiry)。

从版本 5.0 开始,空组也会在 empty-group-min-timeout 后安排删除。 如果 expireGroupsUponCompletion == falseminimumTimeoutForEmptyGroups > 0,则在正常或部分序列释放发生时,会安排删除组的任务。 从版本 5.4 开始,聚合器(和重排序器)可以配置为过期孤立组(持久消息存储中可能不会被释放的组)。 expireTimeout(如果大于 0)表示应清除存储中早于此值的组。 purgeOrphanedGroups() 方法在启动时调用,并且与提供的 expireDuration 一起,在计划任务中定期调用。 此方法也可以随时从外部调用。 过期逻辑根据上述提供的过期选项完全委托给 forceComplete(MessageGroup) 功能。 这种定期清除功能在需要从那些不再通过常规消息到达逻辑释放的旧组中清理消息存储时很有用。 在大多数情况下,当使用持久消息组存储时,这发生在应用程序重新启动后。 该功能类似于带有计划任务的 MessageGroupStoreReaper,但当使用组超时而不是收割器时,它提供了一种在特定组件中处理旧组的便捷方式。 MessageGroupStore 必须专门为当前的关联端点提供。 否则,一个聚合器可能会清除另一个聚合器中的组。 对于聚合器,使用此技术过期的组将根据 expireGroupsUponCompletion 属性被丢弃或作为部分组释放。

如果自定义聚合器处理程序实现在其他 <aggregator> 定义中被引用,我们通常建议使用 ref 属性。 但是,如果自定义聚合器实现仅由单个 <aggregator> 定义使用,则可以使用内部 bean 定义(从版本 1.0.3 开始)在 <aggregator> 元素中配置聚合 POJO,如以下示例所示:

<aggregator input-channel="input" method="sum" output-channel="output">
    <beans:bean class="org.foo.PojoAggregator"/>
</aggregator>

在同一个 <aggregator> 配置中同时使用 ref 属性和内部 bean 定义是不允许的,因为它会创建歧义条件。 在这种情况下,会抛出异常。

以下示例显示了聚合器 bean 的实现:

public class PojoAggregator {

  public Long add(List<Long> results) {
    long total = 0l;
    for (long partialResult: results) {
      total += partialResult;
    }
    return total;
  }
}

前面示例的完成策略 bean 的实现可能如下:

public class PojoReleaseStrategy {
...
  public boolean canRelease(List<Long> numbers) {
    int sum = 0;
    for (long number: numbers) {
      sum += number;
    }
    return sum >= maxValue;
  }
}

在有意义的地方,释放策略方法和聚合器方法可以组合成一个 bean。

前面示例的关联策略 bean 的实现可能如下:

public class PojoCorrelationStrategy {
...
  public Long groupNumbersByLastDigit(Long number) {
    return number % 10;
  }
}

前面示例中的聚合器将根据某个标准(在此例中为除以十后的余数)对数字进行分组,并保留该组,直到有效负载提供的数字之和超过某个值。

在有意义的地方,释放策略方法、关联策略方法和聚合器方法可以组合在一个 bean 中。 (实际上,所有这些或其中任何两个都可以组合。)

聚合器和 Spring 表达式语言 (SpEL)

从 Spring Integration 2.0 开始,您可以使用 SpEL 处理各种策略(关联、释放和聚合),如果此类释放策略背后的逻辑相对简单,我们建议使用它。 假设您有一个遗留组件,它被设计为接收对象数组。 我们知道默认的释放策略将所有聚合消息组装到 List 中。 现在我们有两个问题。 首先,我们需要从列表中提取单个消息。 其次,我们需要提取每条消息的有效负载并组装对象数组。 以下示例解决了这两个问题:

public String[] processRelease(List<Message<String>> messages){
    List<String> stringList = new ArrayList<String>();
    for (Message<String> message : messages) {
        stringList.add(message.getPayload());
    }
    return stringList.toArray(new String[]{});
}

然而,使用 SpEL,这样的要求实际上可以通过一行表达式相对容易地处理,从而省去了编写自定义类并将其配置为 bean 的麻烦。 以下示例演示了如何实现:

<int:aggregator input-channel="aggChannel"
    output-channel="replyChannel"
    expression="#this.![payload].toArray()"/>

在前面的配置中,我们使用 集合投影 表达式从列表中所有消息的有效负载中组装一个新集合,然后将其转换为数组,从而实现与早期 Java 代码相同的结果。

在处理自定义释放和关联策略时,您可以应用相同的基于表达式的方法。

您无需在 correlation-strategy 属性中为自定义 CorrelationStrategy 定义 bean,而是可以将简单的关联逻辑实现为 SpEL 表达式,并在 correlation-strategy-expression 属性中配置它,如以下示例所示:

correlation-strategy-expression="payload.person.id"

在前面的示例中,我们假设有效负载具有一个 person 属性,其中包含一个 id,该 id 将用于关联消息。

同样,对于 ReleaseStrategy,您可以将释放逻辑实现为 SpEL 表达式,并在 release-strategy-expression 属性中配置它。 评估上下文的根对象是 MessageGroup 本身。 消息 List 可以通过表达式中组的 message 属性引用。

在 5.0 版本之前的版本中,根对象是 Message<?> 的集合,如前面的示例所示:

release-strategy-expression="!messages.?[payload==5].empty"

在前面的示例中,SpEL 评估上下文的根对象是 MessageGroup 本身,您表示一旦此组中有一条有效负载为 5 的消息,就应该释放该组。

聚合器和组超时

从版本 4.0 开始,引入了两个新的互斥属性:group-timeoutgroup-timeout-expression。 请参阅 aggregator-xml。 在某些情况下,如果 ReleaseStrategy 在当前消息到达时未释放,您可能需要在超时后发出聚合器结果(或丢弃组)。 为此,groupTimeout 选项允许计划 MessageGroup 被强制完成,如以下示例所示:

<aggregator input-channel="input" output-channel="output"
        send-partial-result-on-expiry="true"
        group-timeout-expression="size() ge 2 ? 10000 : -1"
        release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>

在此示例中,如果聚合器接收到序列中的最后一条消息(由 release-strategy-expression 定义),则可以进行正常释放。 如果该特定消息未到达,则 groupTimeout 会在十秒后强制组完成,前提是该组包含至少两条消息。

强制组完成的结果取决于 ReleaseStrategysend-partial-result-on-expiry。 首先,再次咨询释放策略,看是否进行正常释放。 虽然组没有改变,但 ReleaseStrategy 此时可以决定释放组。 如果释放策略在超时期间仍未释放组,则组将过期。 如果 send-partial-result-on-expirytrue,则 MessageGroup 中现有消息(部分)将作为正常聚合器回复消息释放到 output-channel。 否则,它将被丢弃。

groupTimeout 行为与 MessageGroupStoreReaper 之间存在差异(请参阅 aggregator-xml)。 收割者定期启动 MessageGroupStore 中所有 MessageGroup 的强制完成。 如果新消息在 groupTimeout 期间没有到达,groupTimeout 会对每个 MessageGroup 单独执行此操作。 此外,收割者可用于删除空组(那些为了丢弃迟到的消息而保留的组,如果 expire-groups-upon-completion 为 false)。

从版本 5.5 开始,groupTimeoutExpression 可以评估为 java.util.Date 实例。 这在根据组创建时间 (MessageGroup.getTimestamp()) 而不是当前消息到达时间(当 groupTimeoutExpression 评估为 long 时计算)确定计划任务时刻的情况下很有用:

group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"

使用注解配置聚合器

以下示例显示了一个使用注解配置的聚合器:

public class Waiter {
  ...

  @Aggregator  [id="CO2-1"]1
  public Delivery aggregatingMethod(List<OrderItem> items) {
    ...
  }

  @ReleaseStrategy  [id="CO2-2"]2
  public boolean releaseChecker(List<Message<?>> messages) {
    ...
  }

  @CorrelationStrategy  [id="CO2-3"]3
  public String correlateBy(OrderItem item) {
    ...
  }
}
1 指示此方法应作为聚合器的注解。如果此类的作用是聚合器,则必须指定此注解。
2 指示此方法用作聚合器的释放策略的注解。如果任何方法上不存在,聚合器将使用 SimpleSequenceSizeReleaseStrategy
3 指示此方法应作为聚合器关联策略的注解。如果未指示关联策略,聚合器将使用基于 CORRELATION_IDHeaderAttributeCorrelationStrategy

XML 元素提供的所有配置选项也适用于 @Aggregator 注解。

聚合器可以从 XML 显式引用,或者如果 @MessageEndpoint 定义在类上,则通过类路径扫描自动检测。

聚合器组件的注解配置(@Aggregator 等)仅涵盖简单用例,其中大多数默认选项就足够了。 如果您在使用注解配置时需要对这些选项进行更多控制,请考虑为 AggregatingMessageHandler 使用 @Bean 定义,并将其 @Bean 方法标记为 @ServiceActivator,如以下示例所示:

@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
     AggregatingMessageHandler aggregator =
                       new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                                                 jdbcMessageGroupStore);
     aggregator.setOutputChannel(resultsChannel());
     aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
     aggregator.setTaskScheduler(this.taskScheduler);
     return aggregator;
}

有关更多信息,请参阅 aggregator-api@Bean 方法上使用注解

从版本 4.2 开始,AggregatorFactoryBean 可用于简化 AggregatingMessageHandler 的 Java 配置。

管理聚合器中的状态:MessageGroupStore

聚合器(以及 Spring Integration 中的其他一些模式)是一种有状态模式,它需要根据一段时间内到达的、具有相同关联键的一组消息做出决策。 有状态模式中接口的设计(例如 ReleaseStrategy)遵循的原则是,组件(无论是框架定义还是用户定义)都应该能够保持无状态。 所有状态都由 MessageGroup 携带,其管理委托给 MessageGroupStoreMessageGroupStore 接口定义如下:

public interface MessageGroupStore {

    int getMessageCountForAllMessageGroups();

    int getMarkedMessageCountForAllMessageGroups();

    int getMessageGroupCount();

    MessageGroup getMessageGroup(Object groupId);

    MessageGroup addMessageToGroup(Object groupId, Message<?> message);

    MessageGroup markMessageGroup(MessageGroup group);

    MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);

    MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);

    void removeMessageGroup(Object groupId);

    void registerMessageGroupExpiryCallback(MessageGroupCallback callback);

    int expireMessageGroups(long timeout);
}

有关更多信息,请参阅 Javadoc

MessageGroupStore 在等待释放策略触发时累积 MessageGroups 中的状态信息,并且该事件可能永远不会发生。 因此,为了防止陈旧消息滞留,并且为了易失性存储在应用程序关闭时提供清理挂钩,MessageGroupStore 允许您注册回调,以便在 MessageGroups 过期时应用于它们。 该接口非常简单,如以下清单所示:

public interface MessageGroupCallback {

    void execute(MessageGroupStore messageGroupStore, MessageGroup group);

}

回调直接访问存储和消息组,以便它可以管理持久状态(例如,通过完全从存储中删除组)。

MessageGroupStore 维护这些回调的列表,它根据需要将这些回调应用于所有时间戳早于作为参数提供的时间的消息(请参阅前面描述的 registerMessageGroupExpiryCallback(..)expireMessageGroups(..) 方法)。

重要的是不要在不同的聚合器组件中使用相同的 MessageGroupStore 实例,当您打算依赖 expireMessageGroups 功能时。 每个 AbstractCorrelatingMessageHandler 都注册自己的基于 forceComplete() 回调的 MessageGroupCallback。 这样,每个过期组都可能被错误的聚合器完成或丢弃。 从版本 5.0.10 开始,AbstractCorrelatingMessageHandlerMessageGroupStore 中的注册回调使用了 UniqueExpiryCallbackMessageGroupStore 反过来检查此类的实例是否存在,如果回调集中已存在一个实例,则记录一个错误并显示适当的消息。 这样,框架禁止在不同的聚合器/重排序器中使用 MessageGroupStore 实例,以避免上述过期组的副作用,这些组不是由特定的关联处理程序创建的。

您可以调用 expireMessageGroups 方法并提供一个超时值。 任何早于当前时间减去此值的消息都将过期并应用回调。 因此,存储的用户定义了消息组“过期”的含义。

为了方便用户,Spring Integration 以 MessageGroupStoreReaper 的形式提供了消息过期的包装器,如以下示例所示:

<bean id="reaper" class="org...MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore"/>
    <property name="timeout" value="30000"/>
</bean>

<task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>

收割者是一个 Runnable。 在前面的示例中,消息组存储的过期方法每十秒调用一次。 超时本身是 30 秒。

重要的是要理解 MessageGroupStoreReaper 的“timeout”属性是一个近似值,并且受任务调度程序速率的影响,因为此属性仅在 MessageGroupStoreReaper 任务的下一次计划执行时检查。 例如,如果超时设置为十分钟,但 MessageGroupStoreReaper 任务计划每小时运行一次,并且 MessageGroupStoreReaper 任务的最后一次执行发生在超时前一分钟,则 MessageGroup 在接下来的 59 分钟内不会过期。 因此,我们建议将速率设置为至少等于超时值或更短。

除了收割者之外,当应用程序通过 AbstractCorrelatingMessageHandler 中的生命周期回调关闭时,也会调用过期回调。

AbstractCorrelatingMessageHandler 注册自己的过期回调,这与聚合器 XML 配置中的布尔标志 send-partial-result-on-expiry 相关联。 如果该标志设置为 true,则当调用过期回调时,尚未释放的组中任何未标记的消息都可以发送到输出通道。

由于 MessageGroupStoreReaper 是从计划任务中调用的,并且可能导致生成消息(取决于 sendPartialResultOnExpiry 选项)到下游集成流,因此建议提供一个带有 MessagePublishingErrorHandler 的自定义 TaskScheduler,以便通过 errorChannel 处理异常,正如常规聚合器释放功能所期望的那样。 同样的逻辑也适用于组超时功能,它也依赖于 TaskScheduler。 有关更多信息,请参阅 错误处理

当共享 MessageStore 用于不同的关联端点时,您必须配置一个适当的 CorrelationStrategy 以确保组 ID 的唯一性。 否则,当一个关联端点释放或过期来自其他端点的消息时,可能会发生意外行为。 具有相同关联键的消息存储在同一个消息组中。 一些 MessageStore 实现允许通过分区数据来使用相同的物理资源。 例如,JdbcMessageStore 具有 region 属性,MongoDbMessageStore 具有 collectionName 属性。 有关 MessageStore 接口及其实现的更多信息,请参阅 消息存储

Flux 聚合器

在版本 5.2 中,引入了 FluxAggregatorMessageHandler 组件。 它基于 Project Reactor 的 Flux.groupBy()Flux.window() 运算符。 传入消息被发送到此组件构造函数中由 Flux.create() 启动的 FluxSink。 如果未提供 outputChannel 或它不是 ReactiveStreamsSubscribableChannel 的实例,则主 Flux 的订阅是从 Lifecycle.start() 实现完成的。 否则,它将推迟到由 ReactiveStreamsSubscribableChannel 实现完成的订阅。 消息通过 Flux.groupBy() 使用 CorrelationStrategy 进行分组,以获取组键。 默认情况下,会查询消息的 IntegrationMessageHeaderAccessor.CORRELATION_ID 头。

默认情况下,每个关闭的窗口都作为 Flux 释放到要生成的消息的有效负载中。 此消息包含窗口中第一条消息的所有头。 输出消息有效负载中的此 Flux 必须在下游订阅和处理。 此逻辑可以通过 FluxAggregatorMessageHandlersetCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>) 配置选项进行自定义(或覆盖)。 例如,如果我们希望在最终消息中包含 List 有效负载,我们可以这样配置 Flux.collectList()

fluxAggregatorMessageHandler.setCombineFunction(
                (messageFlux) ->
                        messageFlux
                                .map(Message::getPayload)
                                .collectList()
                                .map(GenericMessage::new));

FluxAggregatorMessageHandler 中有几个选项可以选择合适的窗口策略:

  • setBoundaryTrigger(Predicate<Message<?>>) - 传播到 Flux.windowUntil() 运算符。有关更多信息,请参阅其 JavaDocs。优先于所有其他窗口选项。

  • setWindowSize(int)setWindowSizeFunction(Function<Message<?>, Integer>) - 传播到 Flux.window(int)windowTimeout(int, Duration),具体取决于窗口大小配置。

  • setWindowTimespan(Duration) - 传播到 Flux.window(Duration)windowTimeout(int, Duration),具体取决于窗口大小配置。

  • setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>) - 一个函数,用于对分组的 Flux 进行转换,以实现未涵盖在公开选项中的任何自定义窗口操作。

由于此组件是 MessageHandler 实现,因此可以简单地将其用作 @Bean 定义以及 @ServiceActivator 消息注解。 使用 Java DSL,可以从 .handle() EIP 方法中使用它。 以下示例演示了我们如何在运行时注册 IntegrationFlow 以及 FluxAggregatorMessageHandler 如何与上游拆分器关联:

IntegrationFlow fluxFlow =
        (flow) -> flow
                .split()
                .channel(MessageChannels.flux())
                .handle(new FluxAggregatorMessageHandler());

IntegrationFlowContext.IntegrationFlowRegistration registration =
        this.integrationFlowContext.registration(fluxFlow)
                .register();

Flux<Message<?>> window =
        registration.getMessagingTemplate()
                .convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);

消息组条件

从版本 5.5 开始,AbstractCorrelatingMessageHandler(包括其 Java 和 XML DSL)公开了 BiFunction<Message<?>, String, String> 实现的 groupConditionSupplier 选项。 此函数用于添加到组中的每条消息,并将结果条件语句存储到组中以供将来考虑。 ReleaseStrategy 可以咨询此条件,而不是遍历组中的所有消息。 有关更多信息,请参阅 GroupConditionProvider JavaDocs 和 消息组条件

另请参阅 文件聚合器