散发-聚集

从 4.1 版本开始,Spring Integration 提供了 散发-聚集 企业集成模式的实现。 它是一个复合端点,其目标是将消息发送给接收者并聚合结果。 正如 企业集成模式 中所述,它是一个适用于“最佳报价”等场景的组件,在此类场景中,我们需要向多个供应商请求信息,并决定哪一个能为我们提供所请求项目的最佳条款。 以前,该模式可以通过使用离散组件进行配置。 此增强功能带来了更便捷的配置。 ScatterGatherHandler 是一个请求-回复端点,它结合了 PublishSubscribeChannel(或 RecipientListRouter)和 AggregatingMessageHandler。 请求消息被发送到 scatter 通道,ScatterGatherHandler 等待聚合器发送到 outputChannel 的回复。

功能

散发-聚集 模式提出了两种场景:“拍卖”和“分发”。 在这两种情况下,聚合 功能是相同的,并提供了 AggregatingMessageHandler 的所有可用选项。 (实际上,ScatterGatherHandler 只要求 AggregatingMessageHandler 作为构造函数参数。) 有关更多信息,请参阅 聚合器

拍卖

拍卖 散发-聚集 变体使用“发布-订阅”逻辑处理请求消息,其中“scatter”通道是一个 PublishSubscribeChannel,带有 apply-sequence="true"。 然而,此通道可以是任何 MessageChannel 实现(正如 ContentEnricher 中的 request-channel 一样——请参阅 内容丰富器)。 但是,在这种情况下,您应该为 聚合 函数创建自己的自定义 correlationStrategy

分发

分发 散发-聚集 变体基于 RecipientListRouter(请参阅 RecipientListRouter),并具有 RecipientListRouter 的所有可用选项。 这是第二个 ScatterGatherHandler 构造函数参数。 如果您只想依赖 recipient-list-routeraggregator 的默认 correlationStrategy,则应指定 apply-sequence="true"。 否则,您应该为 aggregator 提供一个自定义 correlationStrategy。 与 PublishSubscribeChannel 变体(拍卖变体)不同,拥有 recipient-list-router selector 选项允许根据消息过滤目标供应商。 使用 apply-sequence="true",将提供默认的 sequenceSize,并且 aggregator 可以正确释放组。 分发选项与拍卖选项互斥。

applySequence=true 仅适用于基于 ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) 构造函数配置的纯 Java 配置,因为框架无法改变外部提供的组件。 为了方便起见,从 6.0 版本开始,Scatter-Gather 的 XML 和 Java DSL 将 applySequence 设置为 true。

对于拍卖和分发两种变体,请求(散发)消息都通过 gatherResultChannel 头部进行丰富,以等待来自 聚合器 的回复消息。

默认情况下,所有供应商都应将其结果发送到 replyChannel 头部(通常通过从最终端点省略 output-channel 来实现)。 然而,还提供了 gatherChannel 选项,允许供应商将其回复发送到该通道进行聚合。

配置散发-聚集端点

以下示例展示了 散发-聚集 bean 定义的 Java 配置:

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

在前面的示例中,我们配置了 RecipientListRouter distributor bean,其中 applySequence="true" 和接收者通道列表。 下一个 bean 用于 AggregatingMessageHandler。 最后,我们将这两个 bean 注入到 ScatterGatherHandler bean 定义中,并将其标记为 @ServiceActivator,以将散发-聚集组件连接到集成流中。

以下示例展示了如何使用 XML 命名空间配置 <scatter-gather> 端点:

<scatter-gather
		id=""  [id="CO1-1"]1
		auto-startup=""  [id="CO1-2"]2
		input-channel=""  [id="CO1-3"]3
		output-channel=""  [id="CO1-4"]4
		scatter-channel=""  [id="CO1-5"]5
		gather-channel=""  [id="CO1-6"]6
		order=""  [id="CO1-7"]7
		phase=""  [id="CO1-8"]8
		send-timeout=""  [id="CO1-9"]9
		gather-timeout=""  [id="CO1-10"]10
		requires-reply="" > [id="CO1-11"]11
			<scatterer/>  [id="CO1-12"]12
			<gatherer/>  [id="CO1-13"]13
</scatter-gather>
 <1>  端点的 ID。
`ScatterGatherHandler` bean 以 `id + '.handler'` 的别名注册。
`RecipientListRouter` bean 以 `id + '.scatterer'` 的别名注册。
`AggregatingMessageHandler` bean 以 `id + '.gatherer'` 的别名注册。
可选。
(`BeanFactory` 生成一个默认的 `id` 值。)
 <1>  生命周期属性,表示端点是否应在应用程序上下文初始化期间启动。
此外,`ScatterGatherHandler` 还实现了 `Lifecycle`,并启动和停止 `gatherEndpoint`,如果提供了 `gather-channel`,则 `gatherEndpoint` 会在内部创建。
可选。
(默认值为 `true`。)
 <1>  用于在 `ScatterGatherHandler` 中接收请求消息并处理它们的通道。
必需。
 <1>  `ScatterGatherHandler` 将聚合结果发送到的通道。
可选。
(传入消息可以在 `replyChannel` 消息头中指定回复通道)。
 <1>  用于拍卖场景的散发消息发送到的通道。
可选。
与 `<scatterer>` 子元素互斥。
 <1>  用于接收来自每个供应商的聚合回复的通道。
它用作散发消息中的 `replyChannel` 头部。
可选。
默认情况下,会创建 `FixedSubscriberChannel`。
 <1>  当多个处理器订阅到同一个 `DirectChannel` 时,此组件的顺序(用于负载均衡)。
可选。
 <1>  指定端点应在哪个阶段启动和停止。
启动顺序从低到高,关闭顺序从高到低。
默认情况下,此值为 `Integer.MAX_VALUE`,这意味着此容器尽可能晚启动,尽可能早停止。
可选。
 <1>  将回复 `Message` 发送到 `output-channel` 时等待的超时时间间隔。
默认情况下,`send()` 阻塞一秒钟。
它仅适用于输出通道具有某些“发送”限制的情况——例如,一个容量固定的 `QueueChannel` 满了。
在这种情况下,会抛出 `MessageDeliveryException`。
`send-timeout` 对于 `AbstractSubscribableChannel` 实现会被忽略。
对于 `group-timeout(-expression)`,来自计划的过期任务的 `MessageDeliveryException` 会导致此任务被重新计划。
可选。
 <1>  允许您指定散发-聚集在返回之前等待回复消息的时间。
默认情况下,它等待 `30` 秒。
如果回复超时,则返回“null”。
可选。
 <1>  指定散发-聚集是否必须返回非空值。
此值默认为 `true`。
因此,当底层聚合器在 `gather-timeout` 后返回 null 值时,会抛出 `ReplyRequiredException`。
请注意,如果 `null` 是一种可能性,则应指定 `gather-timeout` 以避免无限期等待。
 <1>  `<recipient-list-router>` 选项。
可选。
与 `scatter-channel` 属性互斥。
 <1>  `<aggregator>` 选项。
必需。

错误处理

由于散发-聚集是一个多请求-回复组件,错误处理具有一些额外的复杂性。 在某些情况下,如果 ReleaseStrategy 允许进程以少于请求的回复完成,最好只是捕获并忽略下游异常。 在其他情况下,当发生错误时,应考虑从子流返回“补偿消息”之类的东西。

每个异步子流都应配置 errorChannel 头部,以便 MessagePublishingErrorHandler 发送正确的错误消息。 否则,错误将发送到全局 errorChannel,并使用通用的错误处理逻辑。 有关异步错误处理的更多信息,请参阅 scatter-gather-error-handling

同步流可以使用 ExpressionEvaluatingRequestHandlerAdvice 来忽略异常或返回补偿消息。 当其中一个子流向 ScatterGatherHandler 抛出异常时,它只是重新抛出到上游。 这样,所有其他子流将白费功夫,它们的回复将在 ScatterGatherHandler 中被忽略。 这有时可能是预期的行为,但在大多数情况下,最好在特定的子流中处理错误,而不会影响所有其他子流和收集器中的预期。

从 5.1.3 版本开始,ScatterGatherHandler 提供了 errorChannelName 选项。 它填充到散发消息的 errorChannel 头部,并在发生异步错误时使用,或者可以在常规同步子流中用于直接发送错误消息。

以下示例配置演示了通过返回补偿消息进行异步错误处理:

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

为了产生正确的回复,我们必须从已由 MessagePublishingErrorHandler 发送到 scatterGatherErrorChannelMessagingExceptionfailedMessage 中复制头部(包括 replyChannelerrorChannel)。 这样,目标异常将返回到 ScatterGatherHandler 的收集器,以完成回复消息组。 这样的异常 payload 可以在收集器的 MessageGroupProcessor 中过滤掉,或者在散发-聚集端点之后以其他方式进行下游处理。

在将散发结果发送到收集器之前,ScatterGatherHandler 会恢复请求消息头部,包括回复和错误通道(如果有)。 这样,即使在散发接收者子流中应用了异步移交,来自 AggregatingMessageHandler 的错误也会传播到调用者。 为了成功操作,gatherResultChanneloriginalReplyChanneloriginalErrorChannel 头部必须传回到来自散发接收者子流的回复中。 在这种情况下,必须为 ScatterGatherHandler 配置一个合理、有限的 gatherTimeout。 否则,默认情况下,它将永远阻塞等待来自收集器的回复。