R2DBC 支持

Spring Integration 提供了通道适配器,用于通过 R2DBC 驱动程序以响应式方式访问数据库来接收和发送消息。 你需要将此依赖项包含到你的项目中:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-r2dbc</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:{project-version}"

R2DBC 入站通道适配器

R2dbcMessageSource 是一个可轮询的 MessageSource 实现,它基于 R2dbcEntityOperations,并根据 expectSingleResult 选项,生成以 FluxMono 作为有效载荷的消息,用于从数据库中获取数据。 SELECT 查询可以是静态提供的,也可以是基于 SpEL 表达式的,该表达式在每次 receive() 调用时进行评估。 R2dbcMessageSource.SelectCreator 作为评估上下文的根对象存在,允许使用 StatementMapper.SelectSpec 流式 API。 默认情况下,此通道适配器将 SELECT 中的记录映射到 LinkedCaseInsensitiveMap 实例中。 可以通过提供 payloadType 选项来自定义,该选项在 EntityRowMapper 内部使用,基于 this.r2dbcEntityOperations.getConverter()updateSql 是可选的,用于标记数据库中已读取的记录,以便在后续轮询中跳过。 UPDATE 操作可以提供一个 BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>,以便根据 SELECT 结果中的记录将值绑定到 UPDATE 中。

此通道适配器的典型配置可能如下所示:

@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
    R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
            "SELECT * FROM person WHERE name='Name'");
    r2dbcMessageSource.setPayloadType(Person.class);
    r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
    r2dbcMessageSource.setBindFunction(
				(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
    return r2dbcMessageSource;
}

使用 Java DSL,此通道适配器的配置如下:

@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlow
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
            (selectCreator) ->
                    selectCreator.createSelect("person")
                        .withProjection("*")
                        .withCriteria(Criteria.where("id").is(1)))
                    .expectSingleResult(true)
                    .payloadType(Person.class)
                    .updateSql("UPDATE Person SET id='2' where id = :id")
                    .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
                            bindSpec.bind("id", o.getId())),
            e -> e.poller(p -> p.fixedDelay(100)))
        .handle((p, h) -> p)
        .channel(MessageChannels.flux())
        .get();
}

R2DBC 出站通道适配器

R2dbcMessageHandler 是一个 ReactiveMessageHandler 实现,用于使用提供的 R2dbcEntityOperations 在数据库中执行 INSERT(默认)、UPDATEDELETE 查询。 R2dbcMessageHandler.Type 可以静态配置,也可以通过针对请求消息的 SpEL 表达式进行配置。 要执行的查询可以基于 tableNamevaluescriteria 表达式选项,或者(如果未提供 tableName)整个消息有效载荷被视为 org.springframework.data.relational.core.mapping.Table 实体以执行 SQL。 包 org.springframework.data.relational.core.query 已注册为 SpEL 评估上下文的导入,以便直接访问 Criteria 流式 API,该 API 用于 UPDATEDELETE 查询。 valuesExpression 用于 INSERTUPDATE,并且必须评估为 Map,以用于列值对,从而针对请求消息在目标表中执行更改。

此通道适配器的典型配置可能如下所示:

@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
    R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
    messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
    messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
    messageHandler.setCriteriaExpression(
        EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
    return messageHandler;
}

使用 Java DSL,此通道适配器的配置如下:

.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
        .queryType(R2dbcMessageHandler.Type.UPDATE)
        .tableNameExpression("payload.class.simpleName")
        .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
        .values("{age:36}"))