重新排序器
重新排序器与聚合器相关,但用途不同。 聚合器组合消息,而重新排序器则不改变消息地传递它们。
功能
重新排序器的工作方式与聚合器类似,因为它使用 CORRELATION_ID
将消息分组存储。
不同之处在于,重新排序器不以任何方式处理消息。
相反,它按照 SEQUENCE_NUMBER
标头值的顺序释放它们。
在这方面,您可以选择一次性释放所有消息(在整个序列之后,根据 SEQUENCE_SIZE
和其他可能性),或者一旦有效的序列可用就立即释放。
(本章稍后将介绍“有效序列”的含义。)
重新排序器旨在重新排序相对较短的、具有小间隙的消息序列。 如果存在大量不连续的、具有许多间隙的序列,则可能会遇到性能问题。
配置重新排序器
有关在 Java DSL 中配置重新排序器的信息,请参阅 聚合器和重新排序器。
配置重新排序器只需要在 XML 中包含适当的元素。
以下示例显示了重新排序器的配置:
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" [id="CO1-1"]1
input-channel="inputChannel" [id="CO1-2"]2
output-channel="outputChannel" [id="CO1-3"]3
discard-channel="discardChannel" [id="CO1-4"]4
release-partial-sequences="true" [id="CO1-5"]5
message-store="messageStore" [id="CO1-6"]6
send-partial-result-on-expiry="true" [id="CO1-7"]7
send-timeout="86420000" [id="CO1-8"]8
correlation-strategy="correlationStrategyBean" [id="CO1-9"]9
correlation-strategy-method="correlate" [id="CO1-10"]10
correlation-strategy-expression="headers['something']" [id="CO1-11"]11
release-strategy="releaseStrategyBean" [id="CO1-12"]12
release-strategy-method="release" [id="CO1-13"]13
release-strategy-expression="size() == 10" [id="CO1-14"]14
empty-group-min-timeout="60000" [id="CO1-15"]15
lock-registry="lockRegistry" [id="CO1-16"]16
group-timeout="60000" [id="CO1-17"]17
group-timeout-expression="size() ge 2 ? 100 : -1" [id="CO1-18"]18
scheduler="taskScheduler" /> [id="CO1-19"]19
expire-group-upon-timeout="false" /> [id="CO1-20"]20
<1> 重新排序器的 ID 是可选的。 <1> 重新排序器的输入通道。 必填。 <1> 重新排序器将重新排序的消息发送到的通道。 可选。 <1> 重新排序器将超时的消息发送到的通道(如果 `send-partial-result-on-timeout` 设置为 `false`)。 可选。 <1> 是否在有序序列可用时立即发送,或者仅在整个消息组到达后发送。 可选。 (默认值为 `false`。) <1> 引用一个 `MessageGroupStore`,可用于在其关联键下存储消息组,直到它们完整。 可选。 (默认是易失性内存存储。) <1> 组过期时,是否应发送有序组(即使缺少某些消息)。 可选。 (默认值为 false。) 请参阅 xref:aggregator.adoc#reaper[聚合器中的状态管理:`MessageGroupStore`]。 <1> 当发送回复 `Message` 到 `output-channel` 或 `discard-channel` 时等待的超时间隔。 它仅在输出通道具有某些“发送”限制(例如具有固定“容量”的 `QueueChannel`)时应用。 在这种情况下,会抛出 `MessageDeliveryException`。 对于 `AbstractSubscribableChannel` 实现,`send-timeout` 被忽略。 对于 `group-timeout(-expression)`,来自计划的过期任务的 `MessageDeliveryException` 会导致此任务重新安排。 可选。 <1> 引用实现消息关联(分组)算法的 bean。 该 bean 可以是 `CorrelationStrategy` 接口的实现或 POJO。 在后一种情况下,还必须定义 `correlation-strategy-method` 属性。 可选。 (默认情况下,聚合器使用 `IntegrationMessageHeaderAccessor.CORRELATION_ID` 标头。) <1> 在 `correlation-strategy` 引用的 bean 上定义的方法,用于实现关联决策算法。 可选,但有限制(需要存在 `correlation-strategy`)。 <1> 表示关联策略的 SpEL 表达式。 示例:`"headers['something']"`。 `correlation-strategy` 或 `correlation-strategy-expression` 只能有一个。 <1> 引用实现释放策略的 bean。 该 bean 可以是 `ReleaseStrategy` 接口的实现或 POJO。 在后一种情况下,还必须定义 `release-strategy-method` 属性。 可选(默认情况下,聚合器将使用 `IntegrationMessageHeaderAccessor.SEQUENCE_SIZE` 标头属性)。 <1> 在 `release-strategy` 引用的 bean 上定义的方法,用于实现完成决策算法。 可选,但有限制(需要存在 `release-strategy`)。 <1> 表示释放策略的 SpEL 表达式。 表达式的根对象是 `MessageGroup`。 示例:`"size() == 5"`。 `release-strategy` 或 `release-strategy-expression` 只能有一个。 <1> 仅当为 `<resequencer>` `MessageStore` 配置了 `MessageGroupStoreReaper` 时才适用。 默认情况下,当配置 `MessageGroupStoreReaper` 以使部分组过期时,空组也会被删除。 在组正常释放后,空组仍然存在。 这是为了能够检测和丢弃延迟到达的消息。 如果您希望空组的过期计划比部分组的过期计划更长,请设置此属性。 然后,在空组至少在此毫秒数内未被修改之前,它们不会从 `MessageStore` 中删除。 请注意,空组的实际过期时间也受收割者的超时属性影响,并且可能高达此值加上超时。 <1> 请参阅 xref:aggregator.adoc#aggregator-xml[使用 XML 配置聚合器]。 <1> 请参阅 xref:aggregator.adoc#aggregator-xml[使用 XML 配置聚合器]。 <1> 请参阅 xref:aggregator.adoc#aggregator-xml[使用 XML 配置聚合器]。 <1> 请参阅 xref:aggregator.adoc#aggregator-xml[使用 XML 配置聚合器]。 <1> 默认情况下,当组因超时(或由 `MessageGroupStoreReaper`)而完成时,空组的元数据会保留。 延迟到达的消息会立即被丢弃。 将其设置为 `true` 以完全删除组。 然后,延迟到达的消息会启动一个新组,并且在组再次超时之前不会被丢弃。 由于序列范围中的“空洞”导致超时,新组永远不会正常释放。 可以通过使用 `MessageGroupStoreReaper` 和 `empty-group-min-timeout` 属性来使空组稍后过期(完全删除)。 从 5.0 版本开始,空组也会在 `empty-group-min-timeout` 过去后被调度删除。 默认值为 'false'。
另请参阅 聚合器过期组 以获取更多信息。
由于重新排序器在 Java 类中没有要实现的自定义行为,因此它没有注解支持。 |