Operator gateway()
IntegrationFlow
定义中的 gateway()
运算符是一个特殊的Service Activator实现,用于通过其输入通道调用其他端点或集成流并等待回复。
从技术上讲,它与 <chain>
定义中嵌套的 <gateway>
组件扮演相同的角色(参见 从链内部调用链),并使流更清晰、更直接。
从逻辑上和业务角度来看,它是一个消息网关,允许在目标集成解决方案的不同部分之间分发和重用功能(参见 消息网关)。
此运算符有几个重载,用于不同的目的:
-
gateway(String requestChannel)
通过名称向某个端点的输入通道发送消息; -
gateway(MessageChannel requestChannel)
通过直接注入向某个端点的输入通道发送消息; -
gateway(IntegrationFlow flow)
向提供的IntegrationFlow
的输入通道发送消息。
所有这些都带有一个第二个 Consumer<GatewayEndpointSpec>
参数的变体,用于配置目标 GatewayMessageHandler
和相应的 AbstractEndpoint
。
此外,基于 IntegrationFlow
的方法允许调用现有 IntegrationFlow
bean 或通过 IntegrationFlow
函数式接口的内联 lambda 将流声明为子流,或者将其提取到 private
方法中,以实现更简洁的代码风格:
@Bean
IntegrationFlow someFlow() {
return IntegrationFlow
.from(...)
.gateway(subFlow())
.handle(...)
.get();
}
private static IntegrationFlow subFlow() {
return f -> f
.scatterGather(s -> s.recipientFlow(...),
g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流不总是返回回复,您应该将 requestTimeout
设置为 0,以防止调用线程无限期挂起。
在这种情况下,流将在该点结束,并且线程被释放以进行进一步的工作。
从 6.5 版本开始,此 gateway()
运算符完全支持 async(true)
行为。
在内部,为 GatewayProxyFactoryBean
提供了一个 AsyncRequestReplyExchanger
服务接口。
由于 AsyncRequestReplyExchanger
契约是 CompletableFuture<Message<?>>
,因此整个请求-回复以异步方式执行。