RSocket 支持

RSocket Spring Integration 模块 (spring-integration-rsocket) 允许执行 RSocket 应用程序协议。 你需要将此依赖项包含到你的项目中:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:{project-version}"

该模块从 5.2 版本开始提供,并基于 Spring Messaging 基础及其 RSocket 组件实现,例如 RSocketRequesterRSocketMessageHandlerRSocketStrategies。 有关 RSocket 协议、术语和组件的更多信息,请参阅 Spring Framework RSocket 支持。 在通过通道适配器启动集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。 为此,Spring Integration RSocket 支持提供了 AbstractRSocketConnectorServerRSocketConnectorClientRSocketConnector 实现。 ServerRSocketConnector 根据提供的 io.rsocket.transport.ServerTransport 在主机和端口上公开一个侦听器,用于接受来自客户端的连接。 内部 RSocketServer 实例可以通过 setServerConfigurer() 进行自定义,也可以配置其他选项,例如用于有效负载数据和头部元数据的 RSocketStrategiesMimeType。 当客户端请求者提供 setupRoute(参见下面的 ClientRSocketConnector)时,连接的客户端作为 RSocketRequester 存储在由 clientRSocketKeyStrategy BiFunction<Map<String, Object>, DataBuffer, Object> 确定的键下。 默认情况下,连接数据用作键,作为转换为 UTF-8 字符集的字符串值。 这样的 RSocketRequester 注册表可以在应用程序逻辑中用于确定特定的客户端连接以与其交互,或将相同的消息发布到所有连接的客户端。 当客户端建立连接时,ServerRSocketConnector 会发出 RSocketConnectedEvent。 这与 Spring Messaging 模块中 @ConnectMapping 注解提供的功能类似。 映射模式 * 意味着接受所有客户端路由。 RSocketConnectedEvent 可用于通过 DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER 头部区分不同的路由。 典型的服务器配置可能如下所示:

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ServerRSocketConnector serverRSocketConnector() {
    ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
    serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
    serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
    serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
    serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
                                    + headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
    return serverRSocketConnector;
}

@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
	...
}

所有选项,包括 RSocketStrategies bean 和用于 RSocketConnectedEvent@EventListener,都是可选的。 有关更多信息,请参阅 ServerRSocketConnector JavaDocs。 从 5.2.1 版本开始,ServerRSocketMessageHandler 被提取到公共的顶级类,以便可能与现有的 RSocket 服务器连接。 当 ServerRSocketConnector 提供外部 ServerRSocketMessageHandler 实例时,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给提供的实例。 此外,ServerRSocketMessageHandler 可以配置 messageMappingCompatible 标志,以处理 RSocket 控制器的 @MessageMapping,完全替换标准 RSocketMessageHandler 提供的功能。 这在混合配置中很有用,当经典的 @MessageMapping 方法与 RSocket 通道适配器和应用程序中外部配置的 RSocket 服务器同时存在时。 ClientRSocketConnector 作为基于通过提供的 ClientTransport 连接的 RSocketRSocketRequester 的持有者。 RSocketConnector 可以通过提供的 RSocketConnectorConfigurer 进行自定义。 setupRoute(带有可选的模板变量)和带有元数据的 setupData 也可以在此组件上配置。 典型的客户端配置可能如下所示:

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ClientRSocketConnector clientRSocketConnector() {
    ClientRSocketConnector clientRSocketConnector =
            new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
    clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
    clientRSocketConnector.setSetupRoute("clientConnect/{user}");
    clientRSocketConnector.setSetupRouteVariables("myUser");
    return clientRSocketConnector;
}

这些选项中的大多数(包括 RSocketStrategies bean)都是可选的。 请注意我们如何连接到任意端口上本地启动的 RSocket 服务器。 有关 setupData 用例,请参阅 ServerRSocketConnector.clientRSocketKeyStrategy。 另请参阅 ClientRSocketConnector 及其 AbstractRSocketConnector 超类的 JavaDocs,以获取更多信息。 ClientRSocketConnectorServerRSocketConnector 都负责将入站通道适配器映射到其 path 配置,以用于路由传入的 RSocket 请求。 有关更多信息,请参阅下一节。

RSocket 入站网关

RSocketInboundGateway 负责接收 RSocket 请求并生成响应(如果有)。 它需要一个 path 映射数组,该数组可以是类似于 MVC 请求映射或 @MessageMapping 语义的模式。 此外,(自 5.2.2 版本起),可以在 RSocketInboundGateway 上配置一组交互模型(参见 RSocketInteractionModel),以通过特定帧类型限制 RSocket 请求到此端点。 默认情况下,支持所有交互模型。 这样一个 bean,根据其 IntegrationRSocketEndpoint 实现(ReactiveMessageHandler 的扩展),由 ServerRSocketConnectorClientRSocketConnector 自动检测,用于内部 IntegrationRSocketMessageHandler 中的传入请求路由逻辑。 AbstractRSocketConnector 可以提供给 RSocketInboundGateway 以进行显式端点注册。 这样,该 AbstractRSocketConnector 上的自动检测选项被禁用。 RSocketStrategies 也可以注入到 RSocketInboundGateway 中,或者从提供的 AbstractRSocketConnector 获取,覆盖任何显式注入。 解码器用于那些 RSocketStrategies,根据提供的 requestElementType 解码请求有效负载。 如果传入 Message 中未提供 RSocketPayloadReturnValueHandler.RESPONSE_HEADER 头部,RSocketInboundGateway 将请求视为 fireAndForget RSocket 交互模型。 在这种情况下,RSocketInboundGateway 执行一个普通的 send 操作到 outputChannel。 否则,RSocketPayloadReturnValueHandler.RESPONSE_HEADER 头部中的 MonoProcessor 值用于向 RSocket 发送回复。 为此,RSocketInboundGatewayoutputChannel 执行 sendAndReceiveMessageReactive 操作。 根据 MessagingRSocket 逻辑,要发送到下游的消息的 payload 始终是 Flux。 当处于 fireAndForget RSocket 交互模型时,消息具有普通的转换 payload。 回复 payload 可以是普通对象或 Publisher - RSocketInboundGateway 根据 RSocketStrategies 中提供的编码器将它们正确地转换为 RSocket 响应。

从 5.3 版本开始,decodeFluxAsUnit 选项(默认 false)已添加到 RSocketInboundGateway。 默认情况下,传入的 Flux 会以其每个事件单独解码的方式进行转换。 这是目前 @MessageMapping 语义中存在的精确行为。 要恢复以前的行为或根据应用程序要求将整个 Flux 解码为单个单元,必须将 decodeFluxAsUnit 设置为 true。 但是,目标解码逻辑取决于所选的 Decoder,例如,StringDecoder 需要流中存在换行符(默认情况下)才能指示字节缓冲区结束。

有关如何配置 RSocketInboundGateway 端点以及如何处理下游有效负载的示例,请参阅 rsocket-java-config

RSocket 出站网关

RSocketOutboundGateway 是一个 AbstractReplyProducingMessageHandler,用于向 RSocket 发送请求并根据 RSocket 响应(如果有)生成回复。 低级 RSocket 协议交互委托给从提供的 ClientRSocketConnector 或从服务器端请求消息中的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER 头部解析的 RSocketRequester。 服务器端的目标 RSocketRequester 可以从 RSocketConnectedEvent 或使用 ServerRSocketConnector.getClientRSocketRequester() API 根据通过 ServerRSocketConnector.setClientRSocketKeyStrategy() 为连接请求映射选择的某个业务键来解析。 有关更多信息,请参阅 ServerRSocketConnector JavaDocs。

发送请求的 route 必须明确配置(以及路径变量)或通过针对请求消息评估的 SpEL 表达式进行配置。

RSocket 交互模型可以通过 RSocketInteractionModel 选项或相应的表达式设置来提供。 默认情况下,requestResponse 用于常见的网关用例。

当请求消息有效负载是 Publisher 时,可以提供 publisherElementType 选项,以根据目标 RSocketRequester 中提供的 RSocketStrategies 对其元素进行编码。 此选项的表达式可以评估为 ParameterizedTypeReference。 有关数据及其类型的更多信息,请参阅 RSocketRequester.RequestSpec.data() JavaDocs。

RSocket 请求还可以通过 metadata 进行增强。 为此,可以在 RSocketOutboundGateway 上配置针对请求消息的 metadataExpression。 这样的表达式必须评估为 Map<Object, MimeType>

interactionModel 不是 fireAndForget 时,必须提供 expectedResponseType。 它默认为 String.class。 此选项的表达式可以评估为 ParameterizedTypeReference。 有关回复数据及其类型的更多信息,请参阅 RSocketRequester.RetrieveSpec.retrieveMono()RSocketRequester.RetrieveSpec.retrieveFlux() JavaDocs。

RSocketOutboundGateway 的回复 payload 始终是 Mono(即使对于 fireAndForget 交互模型,它也是 Mono<Void>),始终使此组件成为 async。 在生成到 outputChannel 以用于常规通道之前,此 Mono 会被订阅,或由 FluxMessageChannel 按需处理。 requestStreamrequestChannel 交互模型的 Flux 响应也包装在回复 Mono 中。 它可以由 FluxMessageChannel 通过直通服务激活器进行下游扁平化:

@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
    return payload;
}

或在目标应用程序逻辑中显式订阅。

预期响应类型也可以配置(或通过表达式评估)为 void,将此网关视为出站通道适配器。 但是,outputChannel 仍必须配置(即使它只是 NullChannel)以启动对返回的 Mono 的订阅。

有关如何配置 RSocketOutboundGateway 端点以及如何处理下游有效负载的示例,请参阅 rsocket-java-config

RSocket 命名空间支持

Spring Integration 提供了 rsocket 命名空间和相应的模式定义。 要将其包含在配置中,请在应用程序上下文配置文件中添加以下命名空间声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/rsocket
    https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
    ...
</beans>

入站

要使用 XML 配置 Spring Integration RSocket 入站通道适配器,你需要使用 int-rsocket 命名空间中适当的 inbound-gateway 组件。 以下示例演示了如何配置它:

<int-rsocket:inbound-gateway id="inboundGateway"
                             path="testPath"
                             interaction-models="requestStream,requestChannel"
                             rsocket-connector="clientRSocketConnector"
                             request-channel="requestChannel"
                             rsocket-strategies="rsocketStrategies"
                             request-element-type="byte[]"/>

ClientRSocketConnectorServerRSocketConnector 应配置为通用 <bean> 定义。

出站

<int-rsocket:outbound-gateway id="outboundGateway"
                              client-rsocket-connector="clientRSocketConnector"
                              auto-startup="false"
                              interaction-model="fireAndForget"
                              route-expression="'testRoute'"
                              request-channel="requestChannel"
                              publisher-element-type="byte[]"
                              expected-response-type="java.util.Date"
                              metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>

有关所有这些 XML 属性的描述,请参阅 spring-integration-rsocket.xsd

使用 Java 配置 RSocket 端点

以下示例演示了如何使用 Java 配置 RSocket 入站端点:

@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
    RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
    rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
    return rsocketInboundGateway;
}

@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
    return payload.next().map(String::toUpperCase);
}

此配置中假定存在 ClientRSocketConnectorServerRSocketConnector,其含义是自动检测“echo”路径上的此类端点。 请注意 @Transformer 签名,它完全反应式地处理 RSocket 请求并生成反应式回复。

以下示例演示了如何使用 Java DSL 配置 RSocket 入站网关:

@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
    return IntegrationFlow
        .from(RSockets.inboundGateway("/uppercase")
                   .interactionModels(RSocketInteractionModel.requestChannel))
        .<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
        .get();
}

此配置中假定存在 ClientRSocketConnectorServerRSocketConnector,其含义是自动检测“/uppercase”路径上的此类端点以及预期的交互模型为“request channel”。

以下示例演示了如何使用 Java 配置 RSocket 出站网关:

@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
    RSocketOutboundGateway rsocketOutboundGateway =
            new RSocketOutboundGateway(
                    new FunctionExpression<Message<?>>((m) ->
                        m.getHeaders().get("route_header")));
    rsocketOutboundGateway.setInteractionModelExpression(
            new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
    rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
    return rsocketOutboundGateway;
}

setClientRSocketConnector() 仅在客户端需要。 在服务器端,请求消息中必须提供带有 RSocketRequester 值的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER 头部。

以下示例演示了如何使用 Java DSL 配置 RSocket 出站网关:

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlow
        .from(Function.class)
        .handle(RSockets.outboundGateway("/uppercase")
            .interactionModel(RSocketInteractionModel.requestResponse)
            .expectedResponseType(String.class)
            .clientRSocketConnector(clientRSocketConnector))
        .get();
}

有关如何在此流开头使用上述 Function 接口的更多信息,请参阅 IntegrationFlow 作为网关