重新排序器

重新排序器与聚合器相关,但用途不同。 聚合器组合消息,而重新排序器则不改变消息地传递它们。

功能

重新排序器的工作方式与聚合器类似,因为它使用 CORRELATION_ID 将消息分组存储。 不同之处在于,重新排序器不以任何方式处理消息。 相反,它按照 SEQUENCE_NUMBER 标头值的顺序释放它们。

在这方面,您可以选择一次性释放所有消息(在整个序列之后,根据 SEQUENCE_SIZE 和其他可能性),或者一旦有效的序列可用就立即释放。 (本章稍后将介绍“有效序列”的含义。)

重新排序器旨在重新排序相对较短的、具有小间隙的消息序列。 如果存在大量不连续的、具有许多间隙的序列,则可能会遇到性能问题。

配置重新排序器

有关在 Java DSL 中配置重新排序器的信息,请参阅 聚合器和重新排序器

配置重新排序器只需要在 XML 中包含适当的元素。

以下示例显示了重新排序器的配置:

<int:channel id="inputChannel"/>

<int:channel id="outputChannel"/>

<int:resequencer id="completelyDefinedResequencer"  [id="CO1-1"]1
  input-channel="inputChannel"  [id="CO1-2"]2
  output-channel="outputChannel"  [id="CO1-3"]3
  discard-channel="discardChannel"  [id="CO1-4"]4
  release-partial-sequences="true"  [id="CO1-5"]5
  message-store="messageStore"  [id="CO1-6"]6
  send-partial-result-on-expiry="true"  [id="CO1-7"]7
  send-timeout="86420000"  [id="CO1-8"]8
  correlation-strategy="correlationStrategyBean"  [id="CO1-9"]9
  correlation-strategy-method="correlate"  [id="CO1-10"]10
  correlation-strategy-expression="headers['something']"  [id="CO1-11"]11
  release-strategy="releaseStrategyBean"  [id="CO1-12"]12
  release-strategy-method="release"  [id="CO1-13"]13
  release-strategy-expression="size() == 10"  [id="CO1-14"]14
  empty-group-min-timeout="60000"  [id="CO1-15"]15

  lock-registry="lockRegistry"  [id="CO1-16"]16

  group-timeout="60000"  [id="CO1-17"]17
  group-timeout-expression="size() ge 2 ? 100 : -1"  [id="CO1-18"]18
  scheduler="taskScheduler" />  [id="CO1-19"]19
  expire-group-upon-timeout="false" />  [id="CO1-20"]20
 <1> 重新排序器的 ID 是可选的。
 <1> 重新排序器的输入通道。
必填。
 <1> 重新排序器将重新排序的消息发送到的通道。
可选。
 <1> 重新排序器将超时的消息发送到的通道(如果 `send-partial-result-on-timeout` 设置为 `false`)。
可选。
 <1> 是否在有序序列可用时立即发送,或者仅在整个消息组到达后发送。
可选。
(默认值为 `false`。)
 <1> 引用一个 `MessageGroupStore`,可用于在其关联键下存储消息组,直到它们完整。
可选。
(默认是易失性内存存储。)
 <1> 组过期时,是否应发送有序组(即使缺少某些消息)。
可选。
(默认值为 false。)
请参阅 xref:aggregator.adoc#reaper[聚合器中的状态管理:`MessageGroupStore`]。
 <1> 当发送回复 `Message` 到 `output-channel` 或 `discard-channel` 时等待的超时间隔。
它仅在输出通道具有某些“发送”限制(例如具有固定“容量”的 `QueueChannel`)时应用。
在这种情况下,会抛出 `MessageDeliveryException`。
对于 `AbstractSubscribableChannel` 实现,`send-timeout` 被忽略。
对于 `group-timeout(-expression)`,来自计划的过期任务的 `MessageDeliveryException` 会导致此任务重新安排。
可选。
 <1> 引用实现消息关联(分组)算法的 bean。
该 bean 可以是 `CorrelationStrategy` 接口的实现或 POJO。
在后一种情况下,还必须定义 `correlation-strategy-method` 属性。
可选。
(默认情况下,聚合器使用 `IntegrationMessageHeaderAccessor.CORRELATION_ID` 标头。)
 <1> 在 `correlation-strategy` 引用的 bean 上定义的方法,用于实现关联决策算法。
可选,但有限制(需要存在 `correlation-strategy`)。
 <1> 表示关联策略的 SpEL 表达式。
示例:`"headers['something']"`。
`correlation-strategy` 或 `correlation-strategy-expression` 只能有一个。
 <1> 引用实现释放策略的 bean。
该 bean 可以是 `ReleaseStrategy` 接口的实现或 POJO。
在后一种情况下,还必须定义 `release-strategy-method` 属性。
可选(默认情况下,聚合器将使用 `IntegrationMessageHeaderAccessor.SEQUENCE_SIZE` 标头属性)。
 <1> 在 `release-strategy` 引用的 bean 上定义的方法,用于实现完成决策算法。
可选,但有限制(需要存在 `release-strategy`)。
 <1> 表示释放策略的 SpEL 表达式。
表达式的根对象是 `MessageGroup`。
示例:`"size() == 5"`。
`release-strategy` 或 `release-strategy-expression` 只能有一个。
 <1> 仅当为 `<resequencer>` `MessageStore` 配置了 `MessageGroupStoreReaper` 时才适用。
默认情况下,当配置 `MessageGroupStoreReaper` 以使部分组过期时,空组也会被删除。
在组正常释放后,空组仍然存在。
这是为了能够检测和丢弃延迟到达的消息。
如果您希望空组的过期计划比部分组的过期计划更长,请设置此属性。
然后,在空组至少在此毫秒数内未被修改之前,它们不会从 `MessageStore` 中删除。
请注意,空组的实际过期时间也受收割者的超时属性影响,并且可能高达此值加上超时。
 <1> 请参阅 xref:aggregator.adoc#aggregator-xml[使用 XML 配置聚合器]。
 <1> 请参阅 xref:aggregator.adoc#aggregator-xml[使用 XML 配置聚合器]。
 <1> 请参阅 xref:aggregator.adoc#aggregator-xml[使用 XML 配置聚合器]。
 <1> 请参阅 xref:aggregator.adoc#aggregator-xml[使用 XML 配置聚合器]。
 <1> 默认情况下,当组因超时(或由 `MessageGroupStoreReaper`)而完成时,空组的元数据会保留。
延迟到达的消息会立即被丢弃。
将其设置为 `true` 以完全删除组。
然后,延迟到达的消息会启动一个新组,并且在组再次超时之前不会被丢弃。
由于序列范围中的“空洞”导致超时,新组永远不会正常释放。
可以通过使用 `MessageGroupStoreReaper` 和 `empty-group-min-timeout` 属性来使空组稍后过期(完全删除)。
从 5.0 版本开始,空组也会在 `empty-group-min-timeout` 过去后被调度删除。
默认值为 'false'。

另请参阅 聚合器过期组 以获取更多信息。

由于重新排序器在 Java 类中没有要实现的自定义行为,因此它没有注解支持。