Resequencer
重排序器与聚合器有关,但用途不同。聚合器合并消息,而重排序器将消息传递而不改变它们。
The resequencer is related to the aggregator but serves a different purpose. While the aggregator combines messages, the resequencer passes messages through without changing them.
Functionality
重排序器的运作方式类似于聚合器,从某种意义上说,它使用 CORRELATION_ID
将消息存储在组中。不同的是,重排序器不会以任何方式处理消息。相反,它会按照 SEQUENCE_NUMBER
标头值的顺序释放它们。
The resequencer works in a similar way to the aggregator, in the sense that it uses the CORRELATION_ID
to store messages in groups.
The difference is that the Resequencer does not process the messages in any way.
Instead, it releases them in the order of their SEQUENCE_NUMBER
header values.
有鉴于此,你可以选择一次释放所有消息(在整个序列之后,根据 SEQUENCE_SIZE
,以及其他可能性),或者在可用有效序列后立即释放。(我们将在本章后面介绍“有效序列”的含义。)
With respect to that, you can opt to release all messages at once (after the whole sequence, according to the SEQUENCE_SIZE
, and other possibilities) or as soon as a valid sequence is available.
(We cover what we mean by "a valid sequence" later in this chapter.)
重新排序器旨在对短序列消息序列进行重新排序,这些消息序列的间隙很小。如果您有许多带有很多间隙的不相干序列,则可能会遇到性能问题。
The resequencer is intended to resequence relatively short sequences of messages with small gaps. If you have a large number of disjoint sequences with many gaps, you may experience performance issues.
Configuring a Resequencer
有关在 Java DSL 中配置重新排序器的信息,请参阅 Aggregators and Resequencers。
See Aggregators and Resequencers for configuring a resequencer in Java DSL.
配置重排序器只需要在 XML 中包含适当的元素。
Configuring a resequencer requires only including the appropriate element in XML.
以下示例显示了重排序器配置:
The following example shows a resequencer configuration:
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" 1
input-channel="inputChannel" 2
output-channel="outputChannel" 3
discard-channel="discardChannel" 4
release-partial-sequences="true" 5
message-store="messageStore" 6
send-partial-result-on-expiry="true" 7
send-timeout="86420000" 8
correlation-strategy="correlationStrategyBean" 9
correlation-strategy-method="correlate" 10
correlation-strategy-expression="headers['something']" 11
release-strategy="releaseStrategyBean" 12
release-strategy-method="release" 13
release-strategy-expression="size() == 10" 14
empty-group-min-timeout="60000" 15
lock-registry="lockRegistry" 16
group-timeout="60000" 17
group-timeout-expression="size() ge 2 ? 100 : -1" 18
scheduler="taskScheduler" /> 19
expire-group-upon-timeout="false" /> 20
1 | 重新排序器的 ID 是可选的。 |
2 | The id of the resequencer is optional. |
3 | 重新排序器的输入通道。必须的。 |
4 | The input channel of the resequencer. Required. |
5 | 重新排序器向其发送重新排序的消息的通道。可选的。 |
6 | The channel to which the resequencer sends the reordered messages. Optional. |
7 | 重新排序器向其发送超时消息的通道(如果 send-partial-result-on-timeout`设置为 `false )。可选的。 |
8 | The channel to which the resequencer sends the messages that timed out (if send-partial-result-on-timeout is set to false ).
Optional. |
9 | 是否在已用序列可用后立即将其发送出去,或者仅在整个消息组到达后才能将其发送出去。可选的。(默认值为 false )。 |
10 | Whether to send out ordered sequences as soon as they are available or only after the whole message group arrives.
Optional.
(The default is false .) |
11 | `MessageGroupStore`的引用,可用于将消息组存储在其关联键下,直到它们完成。可选的。(默认值是易失性内存中存储)。 |
12 | A reference to a MessageGroupStore that can be used to store groups of messages under their correlation key until they are complete.
Optional.
(The default is a volatile in-memory store.) |
13 | 分组到期后,是否应该发送有序分组(即使某些消息尚缺失)。可选的。(默认值为 false)。参见 Managing State in an Aggregator: MessageGroupStore 。 |
14 | Whether, upon the expiration of the group, the ordered group should be sent out (even if some messages are missing).
Optional.
(The default is false.)
See Managing State in an Aggregator: MessageGroupStore . |
15 | 在向 Message`或 `discard-channel`发送答复 `output-channel`时等待的超时间隔。它仅适用于输出通道具有一些“发送”限制的情况,例如“容量”固定的 `QueueChannel 。在这种情况下,会抛出 MessageDeliveryException`异常。`send-timeout`对于 `AbstractSubscribableChannel`实现将被忽略。对于 `group-timeout(-expression) ,来自计划的已过期任务的 `MessageDeliveryException`将导致对该任务进行重新计划。可选的。 |
16 | The timeout interval to wait when sending a reply Message to the output-channel or discard-channel .
It is applied only if the output channel has some 'sending' limitations, such as a QueueChannel with a fixed 'capacity'.
In this case, a MessageDeliveryException is thrown.
The send-timeout is ignored for AbstractSubscribableChannel implementations.
For group-timeout(-expression) , the MessageDeliveryException from the scheduled expired task leads this task to be rescheduled.
Optional. |
17 | 实现消息关联(分组)算法的 Bean 的引用。Bean 可以是 `CorrelationStrategy`接口或 POJO 的实现。后一种情况下,还必须定义 `correlation-strategy-method`属性。可选的。(默认情况下,聚合器使用 `IntegrationMessageHeaderAccessor.CORRELATION_ID`头)。 |
18 | A reference to a bean that implements the message correlation (grouping) algorithm.
The bean can be an implementation of the CorrelationStrategy interface or a POJO.
In the latter case, the correlation-strategy-method attribute must also be defined.
Optional.
(By default, the aggregator uses the IntegrationMessageHeaderAccessor.CORRELATION_ID header.) |
19 | 在 correlation-strategy`引用的 Bean 上定义的方法,该方法实现关联决策算法。可选择,有限制(需要存在 `correlation-strategy )。 |
20 | A method that is defined on the bean referenced by correlation-strategy and that implements the correlation decision algorithm.
Optional, with restrictions (requires correlation-strategy to be present). |
21 | 表示关联策略的 SpEL 表达式。示例:"headers['something']" 。只允许 `correlation-strategy`或 `correlation-strategy-expression`中的一个存在。 |
22 | A SpEL expression representing the correlation strategy.
Example: "headers['something']" .
Only one of correlation-strategy or correlation-strategy-expression is allowed. |
23 | 实现释放策略的 Bean 的引用。Bean 可以是 `ReleaseStrategy`接口或 POJO 的实现。后一种情况下,还必须定义 `release-strategy-method`属性。可选的(默认情况下,聚合器将使用 `IntegrationMessageHeaderAccessor.SEQUENCE_SIZE`头属性)。 |
24 | A reference to a bean that implements the release strategy.
The bean can be an implementation of the ReleaseStrategy interface or a POJO.
In the latter case, the release-strategy-method attribute must also be defined.
Optional (by default, the aggregator will use the IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header attribute). |
25 | 在 release-strategy`引用的 Bean 上定义的方法,该方法实现完成决策算法。可选择,有限制(需要存在 `release-strategy )。 |
26 | A method that is defined on the bean referenced by release-strategy and that implements the completion decision algorithm.
Optional, with restrictions (requires release-strategy to be present). |
27 | 表示释放策略的 SpEL 表达式。表达式的根对象是一个 MessageGroup 。示例:"size() == 5" 。只允许 `release-strategy`或 `release-strategy-expression`中的一个存在。 |
28 | A SpEL expression representing the release strategy.
The root object for the expression is a MessageGroup .
Example: "size() == 5" .
Only one of release-strategy or release-strategy-expression is allowed. |
29 | 仅适用于为 `<resequencer>``MessageStore`配置 `MessageGroupStoreReaper`的情况。默认情况下,当配置 `MessageGroupStoreReaper`使部分组失效时,也会删除空组。空组在正常释放组后存在。这是为了能够检测和丢弃迟到的消息。如果你希望以比已过期部分组更长的计划来使空组失效,请设置此属性。然后,空组直到至少经过此毫秒数的未修改状态后才从 `MessageStore`中删除。请注意,使空组失效的实际时间也受限于清道夫的超时属性,并且可能相当于这个值加上超时时间。 |
30 | Only applies if a MessageGroupStoreReaper is configured for the <resequencer> MessageStore .
By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed.
Empty groups exist after a group is released normally.
This is to enable the detection and discarding of late-arriving messages.
If you wish to expire empty groups on a longer schedule than expiring partial groups, set this property.
Empty groups are then not removed from the MessageStore until they have not been modified for at least this number of milliseconds.
Note that the actual time to expire an empty group is also affected by the reaper’s timeout property, and it could be as much as this value plus the timeout. |
31 | 参见 Configuring an Aggregator with XML。 |
32 | See Configuring an Aggregator with XML. |
33 | 参见 Configuring an Aggregator with XML。 |
34 | See Configuring an Aggregator with XML. |
35 | 参见 Configuring an Aggregator with XML。 |
36 | See Configuring an Aggregator with XML. |
37 | 参见 Configuring an Aggregator with XML。 |
38 | See Configuring an Aggregator with XML. |
39 | 默认情况下,当一组因超时(或 MessageGroupStoreReaper )而完成时,保留空组的元数据。立即丢弃迟到的消息。将其设置为 `true`以完全删除该组。然后,迟到的消息启动一个新组,并且在该组再次超时之前不会被丢弃。由于造成超时的序列范围中的 “hole”,新组永远不会正常释放。稍后可以使用 `MessageGroupStoreReaper`连同 `empty-group-min-timeout`属性来使空组失效(完全删除)。从 5.0 版开始,空组在 `empty-group-min-timeout`经过后也会被安排删除。默认值为“false”。 |
40 | By default, when a group is completed due to a timeout (or by a MessageGroupStoreReaper ), the empty group’s metadata is retained.
Late arriving messages are immediately discarded.
Set this to true to remove the group completely.
Then, late arriving messages start a new group and are not be discarded until the group again times out.
The new group is never released normally because of the “hole” in the sequence range that caused the timeout.
Empty groups can be expired (completely removed) later by using a MessageGroupStoreReaper together with the empty-group-min-timeout attribute.
Starting with version 5.0, empty groups are also scheduled for removal after the empty-group-min-timeout elapses.
The default is 'false'. |
有关详细信息,另请参见 Aggregator Expiring Groups。
Also see Aggregator Expiring Groups for more information.
由于没有要在重新排序器的 Java 类中实现的自定义行为,因此不支持对此进行注释。 |
Since there is no custom behavior to be implemented in Java classes for resequencers, there is no annotation support for it. |