幂等接收者企业集成模式

从 4.1 版本开始,Spring Integration 提供了 幂等接收者 企业集成模式的实现。 它是一个功能模式,所有的幂等逻辑都应该在应用程序中实现。 然而,为了简化决策过程,提供了 IdempotentReceiverInterceptor 组件。 这是一个 AOP Advice,应用于 MessageHandler.handleMessage() 方法,并且可以根据其配置 过滤 请求消息或将其标记为 重复

以前,您可以通过在 <filter/> 中使用自定义 MessageSelector 来实现此模式(例如,请参阅 过滤器)。 然而,由于此模式真正定义的是端点的行为而不是其本身就是一个端点,因此幂等接收者实现不提供端点组件。 相反,它应用于应用程序中声明的端点。

IdempotentReceiverInterceptor 的逻辑基于提供的 MessageSelector,如果消息未被该选择器接受,它将通过设置 duplicateMessage 标头为 true 进行丰富。 目标 MessageHandler(或下游流)可以查询此标头以实现正确的幂等逻辑。 如果 IdempotentReceiverInterceptor 配置了 discardChannelthrowExceptionOnRejection = true,则重复消息不会发送到目标 MessageHandler.handleMessage()。 相反,它会被丢弃。 如果您希望丢弃(不处理)重复消息,则 discardChannel 应该配置为 NullChannel,例如默认的 nullChannel bean。

为了在消息之间维护状态并提供比较消息以实现幂等性的能力,我们提供了 MetadataStoreSelector。 它接受一个 MessageProcessor 实现(它根据 Message 创建一个查找键)和一个可选的 ConcurrentMetadataStore (元数据存储)。 有关更多信息,请参阅 MetadataStoreSelector Javadoc。 您还可以通过使用额外的 MessageProcessor 来自定义 ConcurrentMetadataStorevalue。 默认情况下,MetadataStoreSelector 使用 timestamp 消息头。

通常,如果键没有现有值,则选择器会选择消息进行接受。 在某些情况下,比较键的当前值和新值以确定是否应接受消息很有用。 从 5.3 版本开始,提供了 compareValues 属性,它引用一个 BiPredicate<String, String>;第一个参数是旧值;返回 true 以接受消息并将旧值替换为 MetadataStore 中的新值。 这对于减少键的数量很有用;例如,在处理文件中的行时,您可以将文件名存储在键中,将当前行号存储在值中。 然后,在重新启动后,您可以跳过已处理的行。 有关示例,请参阅 幂等下游处理拆分文件

为方便起见,MetadataStoreSelector 选项可以直接在 <idempotent-receiver> 组件上配置。 以下列表显示了所有可能的属性:

<idempotent-receiver
        id=""  [id="CO1-1"]1
        endpoint=""  [id="CO1-2"]2
        selector=""  [id="CO1-3"]3
        discard-channel=""  [id="CO1-4"]4
        metadata-store=""  [id="CO1-5"]5
        key-strategy=""  [id="CO1-6"]6
        key-expression=""  [id="CO1-7"]7
        value-strategy=""  [id="CO1-8"]8
        value-expression=""  [id="CO1-9"]9
        compare-values="" [id="CO1-10"]10
        throw-exception-on-rejection="" />  [id="CO1-11"]11
 <1> `IdempotentReceiverInterceptor` bean 的 ID。
可选。
 <1> 此拦截器应用于的消费者端点名称或模式。
使用逗号 (`,`) 分隔名称(模式),例如 `endpoint="aaa, bbb*, *ccc, *ddd*, eee*fff"`。
与这些模式匹配的端点 bean 名称随后用于检索目标端点的 `MessageHandler` bean(使用其 `.handler` 后缀),并将 `IdempotentReceiverInterceptor` 应用于这些 bean。
必填。
 <1> `MessageSelector` bean 引用。
与 `metadata-store` 和 `key-strategy (key-expression)` 互斥。
当未提供 `selector` 时,`key-strategy` 或 `key-strategy-expression` 之一是必需的。
 <1> 当 `IdempotentReceiverInterceptor` 不接受消息时,用于标识将消息发送到的通道。
省略时,重复消息会转发给处理程序,并带有 `duplicateMessage` 标头。
可选。
 <1> `ConcurrentMetadataStore` 引用。
由底层 `MetadataStoreSelector` 使用。
与 `selector` 互斥。
可选。
默认的 `MetadataStoreSelector` 使用内部 `SimpleMetadataStore`,它不维护跨应用程序执行的状态。
 <1> `MessageProcessor` 引用。
由底层 `MetadataStoreSelector` 使用。
从请求消息中评估 `idempotentKey`。
与 `selector` 和 `key-expression` 互斥。
当未提供 `selector` 时,`key-strategy` 或 `key-strategy-expression` 之一是必需的。
 <1> 用于填充 `ExpressionEvaluatingMessageProcessor` 的 SpEL 表达式。
由底层 `MetadataStoreSelector` 使用。
使用请求消息作为评估上下文根对象来评估 `idempotentKey`。
与 `selector` 和 `key-strategy` 互斥。
当未提供 `selector` 时,`key-strategy` 或 `key-strategy-expression` 之一是必需的。
 <1> `MessageProcessor` 引用。
由底层 `MetadataStoreSelector` 使用。
从请求消息中评估 `idempotentKey` 的 `value`。
与 `selector` 和 `value-expression` 互斥。
默认情况下,`MetadataStoreSelector` 使用 `timestamp` 消息头作为元数据 `value`。
 <1> 用于填充 `ExpressionEvaluatingMessageProcessor` 的 SpEL 表达式。
由底层 `MetadataStoreSelector` 使用。
使用请求消息作为评估上下文根对象来评估 `idempotentKey` 的 `value`。
与 `selector` 和 `value-strategy` 互斥。
默认情况下,`MetadataStoreSelector` 使用 `timestamp` 消息头作为元数据 `value`。
 <1> 对 `BiPredicate<String, String>` bean 的引用,它允许您通过比较键的旧值和新值来选择消息;默认为 `null`。
 <1> 如果 `IdempotentReceiverInterceptor` 拒绝消息,是否抛出异常。
默认为 `false`。
无论是否提供了 `discard-channel`,它都适用。

对于 Java 配置,Spring Integration 提供了方法级别的 @IdempotentReceiver 注解。 它用于标记具有消息传递注解(@ServiceActivator@Router 等)的 method,以指定哪些 IdempotentReceiverInterceptor 对象应用于此端点。 以下示例演示了如何使用 @IdempotentReceiver 注解:

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

当您使用 Java DSL 时,可以将拦截器添加到端点的 advice 链中,如以下示例所示:

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}

IdempotentReceiverInterceptor 仅设计用于 MessageHandler.handleMessage(Message<?>) 方法。 从 4.3.1 版本开始,它实现了 HandleMessageAdvice,以 AbstractHandleMessageAdvice 作为基类,以实现更好的解耦。 有关更多信息,请参阅 处理消息 Advice