RSocket 支持
RSocket Spring Integration 模块 (spring-integration-rsocket
) 允许执行 RSocket 应用程序协议。
你需要将此依赖项包含到你的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:{project-version}"
该模块从 5.2 版本开始提供,并基于 Spring Messaging 基础及其 RSocket 组件实现,例如 RSocketRequester
、RSocketMessageHandler
和 RSocketStrategies
。
有关 RSocket 协议、术语和组件的更多信息,请参阅 Spring Framework RSocket 支持。
在通过通道适配器启动集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。
为此,Spring Integration RSocket 支持提供了 AbstractRSocketConnector
的 ServerRSocketConnector
和 ClientRSocketConnector
实现。
ServerRSocketConnector
根据提供的 io.rsocket.transport.ServerTransport
在主机和端口上公开一个侦听器,用于接受来自客户端的连接。
内部 RSocketServer
实例可以通过 setServerConfigurer()
进行自定义,也可以配置其他选项,例如用于有效负载数据和头部元数据的 RSocketStrategies
和 MimeType
。
当客户端请求者提供 setupRoute
(参见下面的 ClientRSocketConnector
)时,连接的客户端作为 RSocketRequester
存储在由 clientRSocketKeyStrategy
BiFunction<Map<String, Object>, DataBuffer, Object>
确定的键下。
默认情况下,连接数据用作键,作为转换为 UTF-8 字符集的字符串值。
这样的 RSocketRequester
注册表可以在应用程序逻辑中用于确定特定的客户端连接以与其交互,或将相同的消息发布到所有连接的客户端。
当客户端建立连接时,ServerRSocketConnector
会发出 RSocketConnectedEvent
。
这与 Spring Messaging 模块中 @ConnectMapping
注解提供的功能类似。
映射模式 *
意味着接受所有客户端路由。
RSocketConnectedEvent
可用于通过 DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
头部区分不同的路由。
典型的服务器配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
所有选项,包括 RSocketStrategies
bean 和用于 RSocketConnectedEvent
的 @EventListener
,都是可选的。
有关更多信息,请参阅 ServerRSocketConnector
JavaDocs。
从 5.2.1 版本开始,ServerRSocketMessageHandler
被提取到公共的顶级类,以便可能与现有的 RSocket 服务器连接。
当 ServerRSocketConnector
提供外部 ServerRSocketMessageHandler
实例时,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给提供的实例。
此外,ServerRSocketMessageHandler
可以配置 messageMappingCompatible
标志,以处理 RSocket 控制器的 @MessageMapping
,完全替换标准 RSocketMessageHandler
提供的功能。
这在混合配置中很有用,当经典的 @MessageMapping
方法与 RSocket 通道适配器和应用程序中外部配置的 RSocket 服务器同时存在时。
ClientRSocketConnector
作为基于通过提供的 ClientTransport
连接的 RSocket
的 RSocketRequester
的持有者。
RSocketConnector
可以通过提供的 RSocketConnectorConfigurer
进行自定义。
setupRoute
(带有可选的模板变量)和带有元数据的 setupData
也可以在此组件上配置。
典型的客户端配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
这些选项中的大多数(包括 RSocketStrategies
bean)都是可选的。
请注意我们如何连接到任意端口上本地启动的 RSocket 服务器。
有关 setupData
用例,请参阅 ServerRSocketConnector.clientRSocketKeyStrategy
。
另请参阅 ClientRSocketConnector
及其 AbstractRSocketConnector
超类的 JavaDocs,以获取更多信息。
ClientRSocketConnector
和 ServerRSocketConnector
都负责将入站通道适配器映射到其 path
配置,以用于路由传入的 RSocket 请求。
有关更多信息,请参阅下一节。
RSocket 入站网关
RSocketInboundGateway
负责接收 RSocket 请求并生成响应(如果有)。
它需要一个 path
映射数组,该数组可以是类似于 MVC 请求映射或 @MessageMapping
语义的模式。
此外,(自 5.2.2 版本起),可以在 RSocketInboundGateway
上配置一组交互模型(参见 RSocketInteractionModel
),以通过特定帧类型限制 RSocket 请求到此端点。
默认情况下,支持所有交互模型。
这样一个 bean,根据其 IntegrationRSocketEndpoint
实现(ReactiveMessageHandler
的扩展),由 ServerRSocketConnector
或 ClientRSocketConnector
自动检测,用于内部 IntegrationRSocketMessageHandler
中的传入请求路由逻辑。
AbstractRSocketConnector
可以提供给 RSocketInboundGateway
以进行显式端点注册。
这样,该 AbstractRSocketConnector
上的自动检测选项被禁用。
RSocketStrategies
也可以注入到 RSocketInboundGateway
中,或者从提供的 AbstractRSocketConnector
获取,覆盖任何显式注入。
解码器用于那些 RSocketStrategies
,根据提供的 requestElementType
解码请求有效负载。
如果传入 Message
中未提供 RSocketPayloadReturnValueHandler.RESPONSE_HEADER
头部,RSocketInboundGateway
将请求视为 fireAndForget
RSocket 交互模型。
在这种情况下,RSocketInboundGateway
执行一个普通的 send
操作到 outputChannel
。
否则,RSocketPayloadReturnValueHandler.RESPONSE_HEADER
头部中的 MonoProcessor
值用于向 RSocket 发送回复。
为此,RSocketInboundGateway
对 outputChannel
执行 sendAndReceiveMessageReactive
操作。
根据 MessagingRSocket
逻辑,要发送到下游的消息的 payload
始终是 Flux
。
当处于 fireAndForget
RSocket 交互模型时,消息具有普通的转换 payload
。
回复 payload
可以是普通对象或 Publisher
- RSocketInboundGateway
根据 RSocketStrategies
中提供的编码器将它们正确地转换为 RSocket 响应。
从 5.3 版本开始,decodeFluxAsUnit
选项(默认 false
)已添加到 RSocketInboundGateway
。
默认情况下,传入的 Flux
会以其每个事件单独解码的方式进行转换。
这是目前 @MessageMapping
语义中存在的精确行为。
要恢复以前的行为或根据应用程序要求将整个 Flux
解码为单个单元,必须将 decodeFluxAsUnit
设置为 true
。
但是,目标解码逻辑取决于所选的 Decoder
,例如,StringDecoder
需要流中存在换行符(默认情况下)才能指示字节缓冲区结束。
有关如何配置 RSocketInboundGateway
端点以及如何处理下游有效负载的示例,请参阅 rsocket-java-config。
RSocket 出站网关
RSocketOutboundGateway
是一个 AbstractReplyProducingMessageHandler
,用于向 RSocket 发送请求并根据 RSocket 响应(如果有)生成回复。
低级 RSocket 协议交互委托给从提供的 ClientRSocketConnector
或从服务器端请求消息中的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
头部解析的 RSocketRequester
。
服务器端的目标 RSocketRequester
可以从 RSocketConnectedEvent
或使用 ServerRSocketConnector.getClientRSocketRequester()
API 根据通过 ServerRSocketConnector.setClientRSocketKeyStrategy()
为连接请求映射选择的某个业务键来解析。
有关更多信息,请参阅 ServerRSocketConnector
JavaDocs。
发送请求的 route
必须明确配置(以及路径变量)或通过针对请求消息评估的 SpEL 表达式进行配置。
RSocket 交互模型可以通过 RSocketInteractionModel
选项或相应的表达式设置来提供。
默认情况下,requestResponse
用于常见的网关用例。
当请求消息有效负载是 Publisher
时,可以提供 publisherElementType
选项,以根据目标 RSocketRequester
中提供的 RSocketStrategies
对其元素进行编码。
此选项的表达式可以评估为 ParameterizedTypeReference
。
有关数据及其类型的更多信息,请参阅 RSocketRequester.RequestSpec.data()
JavaDocs。
RSocket 请求还可以通过 metadata
进行增强。
为此,可以在 RSocketOutboundGateway
上配置针对请求消息的 metadataExpression
。
这样的表达式必须评估为 Map<Object, MimeType>
。
当 interactionModel
不是 fireAndForget
时,必须提供 expectedResponseType
。
它默认为 String.class
。
此选项的表达式可以评估为 ParameterizedTypeReference
。
有关回复数据及其类型的更多信息,请参阅 RSocketRequester.RetrieveSpec.retrieveMono()
和 RSocketRequester.RetrieveSpec.retrieveFlux()
JavaDocs。
RSocketOutboundGateway
的回复 payload
始终是 Mono
(即使对于 fireAndForget
交互模型,它也是 Mono<Void>
),始终使此组件成为 async
。
在生成到 outputChannel
以用于常规通道之前,此 Mono
会被订阅,或由 FluxMessageChannel
按需处理。
requestStream
或 requestChannel
交互模型的 Flux
响应也包装在回复 Mono
中。
它可以由 FluxMessageChannel
通过直通服务激活器进行下游扁平化:
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或在目标应用程序逻辑中显式订阅。
预期响应类型也可以配置(或通过表达式评估)为 void
,将此网关视为出站通道适配器。
但是,outputChannel
仍必须配置(即使它只是 NullChannel
)以启动对返回的 Mono
的订阅。
有关如何配置 RSocketOutboundGateway
端点以及如何处理下游有效负载的示例,请参阅 rsocket-java-config。
RSocket 命名空间支持
Spring Integration 提供了 rsocket
命名空间和相应的模式定义。
要将其包含在配置中,请在应用程序上下文配置文件中添加以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
入站
要使用 XML 配置 Spring Integration RSocket 入站通道适配器,你需要使用 int-rsocket
命名空间中适当的 inbound-gateway
组件。
以下示例演示了如何配置它:
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
ClientRSocketConnector
和 ServerRSocketConnector
应配置为通用 <bean>
定义。
出站
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
有关所有这些 XML 属性的描述,请参阅 spring-integration-rsocket.xsd
。
使用 Java 配置 RSocket 端点
以下示例演示了如何使用 Java 配置 RSocket 入站端点:
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
此配置中假定存在 ClientRSocketConnector
或 ServerRSocketConnector
,其含义是自动检测“echo
”路径上的此类端点。
请注意 @Transformer
签名,它完全反应式地处理 RSocket 请求并生成反应式回复。
以下示例演示了如何使用 Java DSL 配置 RSocket 入站网关:
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
此配置中假定存在 ClientRSocketConnector
或 ServerRSocketConnector
,其含义是自动检测“/uppercase
”路径上的此类端点以及预期的交互模型为“request channel
”。
以下示例演示了如何使用 Java 配置 RSocket 出站网关:
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
setClientRSocketConnector()
仅在客户端需要。
在服务器端,请求消息中必须提供带有 RSocketRequester
值的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
头部。
以下示例演示了如何使用 Java DSL 配置 RSocket 出站网关:
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
有关如何在此流开头使用上述 Function
接口的更多信息,请参阅 IntegrationFlow
作为网关。