幂等接收者企业集成模式
从 4.1 版本开始,Spring Integration 提供了 幂等接收者 企业集成模式的实现。
它是一个功能模式,所有的幂等逻辑都应该在应用程序中实现。
然而,为了简化决策过程,提供了 IdempotentReceiverInterceptor
组件。
这是一个 AOP Advice
,应用于 MessageHandler.handleMessage()
方法,并且可以根据其配置 过滤
请求消息或将其标记为 重复
。
以前,您可以通过在 <filter/>
中使用自定义 MessageSelector
来实现此模式(例如,请参阅 过滤器)。
然而,由于此模式真正定义的是端点的行为而不是其本身就是一个端点,因此幂等接收者实现不提供端点组件。
相反,它应用于应用程序中声明的端点。
IdempotentReceiverInterceptor
的逻辑基于提供的 MessageSelector
,如果消息未被该选择器接受,它将通过设置 duplicateMessage
标头为 true
进行丰富。
目标 MessageHandler
(或下游流)可以查询此标头以实现正确的幂等逻辑。
如果 IdempotentReceiverInterceptor
配置了 discardChannel
或 throwExceptionOnRejection = true
,则重复消息不会发送到目标 MessageHandler.handleMessage()
。
相反,它会被丢弃。
如果您希望丢弃(不处理)重复消息,则 discardChannel
应该配置为 NullChannel
,例如默认的 nullChannel
bean。
为了在消息之间维护状态并提供比较消息以实现幂等性的能力,我们提供了 MetadataStoreSelector
。
它接受一个 MessageProcessor
实现(它根据 Message
创建一个查找键)和一个可选的 ConcurrentMetadataStore
(元数据存储)。
有关更多信息,请参阅 MetadataStoreSelector
Javadoc。
您还可以通过使用额外的 MessageProcessor
来自定义 ConcurrentMetadataStore
的 value
。
默认情况下,MetadataStoreSelector
使用 timestamp
消息头。
通常,如果键没有现有值,则选择器会选择消息进行接受。
在某些情况下,比较键的当前值和新值以确定是否应接受消息很有用。
从 5.3 版本开始,提供了 compareValues
属性,它引用一个 BiPredicate<String, String>
;第一个参数是旧值;返回 true
以接受消息并将旧值替换为 MetadataStore
中的新值。
这对于减少键的数量很有用;例如,在处理文件中的行时,您可以将文件名存储在键中,将当前行号存储在值中。
然后,在重新启动后,您可以跳过已处理的行。
有关示例,请参阅 幂等下游处理拆分文件。
为方便起见,MetadataStoreSelector
选项可以直接在 <idempotent-receiver>
组件上配置。
以下列表显示了所有可能的属性:
<idempotent-receiver
id="" [id="CO1-1"]1
endpoint="" [id="CO1-2"]2
selector="" [id="CO1-3"]3
discard-channel="" [id="CO1-4"]4
metadata-store="" [id="CO1-5"]5
key-strategy="" [id="CO1-6"]6
key-expression="" [id="CO1-7"]7
value-strategy="" [id="CO1-8"]8
value-expression="" [id="CO1-9"]9
compare-values="" [id="CO1-10"]10
throw-exception-on-rejection="" /> [id="CO1-11"]11
<1> `IdempotentReceiverInterceptor` bean 的 ID。 可选。 <1> 此拦截器应用于的消费者端点名称或模式。 使用逗号 (`,`) 分隔名称(模式),例如 `endpoint="aaa, bbb*, *ccc, *ddd*, eee*fff"`。 与这些模式匹配的端点 bean 名称随后用于检索目标端点的 `MessageHandler` bean(使用其 `.handler` 后缀),并将 `IdempotentReceiverInterceptor` 应用于这些 bean。 必填。 <1> `MessageSelector` bean 引用。 与 `metadata-store` 和 `key-strategy (key-expression)` 互斥。 当未提供 `selector` 时,`key-strategy` 或 `key-strategy-expression` 之一是必需的。 <1> 当 `IdempotentReceiverInterceptor` 不接受消息时,用于标识将消息发送到的通道。 省略时,重复消息会转发给处理程序,并带有 `duplicateMessage` 标头。 可选。 <1> `ConcurrentMetadataStore` 引用。 由底层 `MetadataStoreSelector` 使用。 与 `selector` 互斥。 可选。 默认的 `MetadataStoreSelector` 使用内部 `SimpleMetadataStore`,它不维护跨应用程序执行的状态。 <1> `MessageProcessor` 引用。 由底层 `MetadataStoreSelector` 使用。 从请求消息中评估 `idempotentKey`。 与 `selector` 和 `key-expression` 互斥。 当未提供 `selector` 时,`key-strategy` 或 `key-strategy-expression` 之一是必需的。 <1> 用于填充 `ExpressionEvaluatingMessageProcessor` 的 SpEL 表达式。 由底层 `MetadataStoreSelector` 使用。 使用请求消息作为评估上下文根对象来评估 `idempotentKey`。 与 `selector` 和 `key-strategy` 互斥。 当未提供 `selector` 时,`key-strategy` 或 `key-strategy-expression` 之一是必需的。 <1> `MessageProcessor` 引用。 由底层 `MetadataStoreSelector` 使用。 从请求消息中评估 `idempotentKey` 的 `value`。 与 `selector` 和 `value-expression` 互斥。 默认情况下,`MetadataStoreSelector` 使用 `timestamp` 消息头作为元数据 `value`。 <1> 用于填充 `ExpressionEvaluatingMessageProcessor` 的 SpEL 表达式。 由底层 `MetadataStoreSelector` 使用。 使用请求消息作为评估上下文根对象来评估 `idempotentKey` 的 `value`。 与 `selector` 和 `value-strategy` 互斥。 默认情况下,`MetadataStoreSelector` 使用 `timestamp` 消息头作为元数据 `value`。 <1> 对 `BiPredicate<String, String>` bean 的引用,它允许您通过比较键的旧值和新值来选择消息;默认为 `null`。 <1> 如果 `IdempotentReceiverInterceptor` 拒绝消息,是否抛出异常。 默认为 `false`。 无论是否提供了 `discard-channel`,它都适用。
对于 Java 配置,Spring Integration 提供了方法级别的 @IdempotentReceiver
注解。
它用于标记具有消息传递注解(@ServiceActivator
、@Router
等)的 method
,以指定哪些 IdempotentReceiverInterceptor
对象应用于此端点。
以下示例演示了如何使用 @IdempotentReceiver
注解:
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
当您使用 Java DSL 时,可以将拦截器添加到端点的 advice 链中,如以下示例所示:
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
|