使用 R2DBC 进行数据访问
R2DBC(“Reactive Relational Database Connectivity”)是一项由社区推动的 规范工作,旨在标准化使用响应式模式访问 SQL 数据库。
包层次结构
Spring Framework 的 R2DBC 抽象框架包含两个不同的包:
-
core
:org.springframework.r2dbc.core
包包含DatabaseClient
类以及各种相关类。请参阅 使用 R2DBC 核心类控制基本的 R2DBC 处理和错误处理。 -
connection
:org.springframework.r2dbc.connection
包包含一个用于 轻松访问ConnectionFactory
的工具类,以及各种简单的ConnectionFactory
实现, 您可以将其用于测试和运行未经修改的 R2DBC。请参阅 控制数据库连接。
使用 R2DBC 核心类控制基本的 R2DBC 处理和错误处理
本节介绍如何使用 R2DBC 核心类控制基本的 R2DBC 处理, 包括错误处理。它包括以下主题:
使用 DatabaseClient
DatabaseClient
是 R2DBC 核心包中的核心类。它负责
资源的创建和释放,这有助于避免常见的错误,例如
忘记关闭连接。它执行核心 R2DBC
工作流的基本任务(例如语句创建和执行),让应用程序代码提供
SQL 并提取结果。DatabaseClient
类:
-
运行 SQL 查询
-
更新语句和存储过程调用
-
对
Result
实例执行迭代 -
捕获 R2DBC 异常并将其转换为
org.springframework.dao
包中定义的通用、更具信息性的异常层次结构。 (请参阅 一致的异常层次结构。)
该客户端具有功能性、流畅的 API,使用响应式类型进行声明式组合。
当您将 DatabaseClient
用于您的代码时,您只需实现
java.util.function
接口,赋予它们明确定义的契约。
给定由 DatabaseClient
类提供的 Connection
,Function
回调创建一个 Publisher
。对于提取 Row
结果的映射函数也是如此。
您可以通过直接使用 ConnectionFactory
引用实例化 DatabaseClient
,
或者在 Spring IoC 容器中配置它并将其作为 Bean 引用提供给 DAO,从而在 DAO 实现中使用 DatabaseClient
。
创建 DatabaseClient
对象的最简单方法是通过静态工厂方法,如下所示:
-
Java
-
Kotlin
DatabaseClient client = DatabaseClient.create(connectionFactory);
val client = DatabaseClient.create(connectionFactory)
|
上述方法使用默认设置创建 DatabaseClient
。
您还可以从 DatabaseClient.builder()
获取 Builder
实例。
您可以通过调用以下方法来自定义客户端:
-
….bindMarkers(…)
:提供特定的BindMarkersFactory
以配置命名 参数到数据库绑定标记的转换。 -
….executeFunction(…)
:设置ExecuteFunction
以指定Statement
对象如何 运行。 -
….namedParameters(false)
:禁用命名参数扩展。默认启用。
方言由 |
目前支持的数据库有:
-
H2
-
MariaDB
-
Microsoft SQL Server
-
MySQL
-
Postgres
此类的所有 SQL 都以 DEBUG
级别记录在与客户端实例的完全限定类名对应的类别下(通常是
DefaultDatabaseClient
)。此外,每次执行都会在
响应式序列中注册一个检查点以帮助调试。
以下部分提供了一些 DatabaseClient
用法的示例。这些示例
并非 DatabaseClient
公开的所有功能的详尽列表。
请参阅随附的 javadoc 以获取更多信息。
执行语句
DatabaseClient
提供了运行语句的基本功能。
以下示例展示了创建新表所需的最小但功能齐全的
代码:
-
Java
-
Kotlin
Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.then();
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.await()
DatabaseClient
旨在提供方便、流畅的用法。
它在执行规范的每个阶段都暴露了中间、延续和终端方法。上面的示例
使用 then()
返回一个完成 Publisher
,该 Publisher
在查询(如果 SQL 查询包含
多个语句,则为多个查询)完成后立即完成。
|
查询 (SELECT
)
SQL 查询可以通过 Row
对象或受影响的行数返回值。
DatabaseClient
可以返回更新的行数或行本身,
具体取决于发出的查询。
以下查询从表中获取 id
和 name
列:
-
Java
-
Kotlin
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
.fetch().first();
val first = client.sql("SELECT id, name FROM person")
.fetch().awaitSingle()
以下查询使用绑定变量:
-
Java
-
Kotlin
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().first();
val first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitSingle()
您可能已经注意到上面示例中使用了 fetch()
。fetch()
是一个
延续操作符,可让您指定要消耗多少数据。
调用 first()
返回结果中的第一行并丢弃其余行。
您可以使用以下操作符来消费数据:
-
first()
返回整个结果的第一行。其 Kotlin Coroutine 变体 对于非空返回值命名为awaitSingle()
,如果值是可选的,则命名为awaitSingleOrNull()
。 -
one()
返回恰好一个结果,如果结果包含多行则失败。 使用 Kotlin Coroutines,awaitOne()
用于恰好一个值,如果值可能为null
,则使用awaitOneOrNull()
。 -
all()
返回结果的所有行。使用 Kotlin Coroutines 时,使用flow()
。 -
rowsUpdated()
返回受影响的行数(INSERT
/UPDATE
/DELETE
计数)。其 Kotlin Coroutine 变体命名为awaitRowsUpdated()
。
如果未指定进一步的映射细节,查询将以 Map
的形式返回表格结果,
其中键是不区分大小写的列名,映射到其列值。
您可以通过提供一个 Function<Row, T>
来控制结果映射,该函数将为每个 Row
调用,以便它可以返回任意值(单个值、
集合和映射以及对象)。
以下示例提取 name
列并发出其值:
-
Java
-
Kotlin
Flux<String> names = client.sql("SELECT name FROM person")
.map(row -> row.get("name", String.class))
.all();
val names = client.sql("SELECT name FROM person")
.map{ row: Row -> row.get("name", String.class) }
.flow()
或者,有一个映射到单个值的快捷方式:
Flux<String> names = client.sql("SELECT name FROM person")
.mapValue(String.class)
.all();
或者您可以映射到具有 Bean 属性或记录组件的结果对象:
// assuming a name property on Person
Flux<Person> persons = client.sql("SELECT name FROM person")
.mapProperties(Person.class)
.all();
null
呢?关系数据库结果可以包含 null
值。
Reactive Streams 规范禁止发出 null
值。
该要求强制在提取器函数中进行适当的 null
处理。
虽然您可以从 Row
获取 null
值,但您不得发出 null
值。您必须将任何 null
值包装在一个对象中(例如,
对于单个值,使用 Optional
),以确保您的提取器函数永远不会直接返回
null
值。
使用 DatabaseClient
更新 (INSERT
、UPDATE
和 DELETE
)
修改语句的唯一区别是这些语句通常
不返回表格数据,因此您使用 rowsUpdated()
来消费结果。
以下示例显示了一个 UPDATE
语句,该语句返回
更新的行数:
-
Java
-
Kotlin
Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().rowsUpdated();
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitRowsUpdated()
将值绑定到查询
典型的应用程序需要参数化 SQL 语句来根据某些输入选择或
更新行。这些通常是受 WHERE
子句约束的 SELECT
语句,
或接受输入参数的 INSERT
和 UPDATE
语句。如果
参数未正确转义,参数化语句存在 SQL 注入的风险。
DatabaseClient
利用 R2DBC 的 bind
API 来消除查询参数的 SQL 注入风险。
您可以使用 execute(…)
操作符提供参数化 SQL 语句,
并将参数绑定到实际的 Statement
。然后,您的 R2DBC 驱动程序通过使用
预编译语句和参数替换来运行该语句。
参数绑定支持两种绑定策略:
-
按索引,使用从零开始的参数索引。
-
按名称,使用占位符名称。
以下示例显示了查询的参数绑定:
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe")
.bind("name", "Joe")
.bind("age", 34);
或者,您可以传入一个名称和值的映射:
Map<String, Object> params = new LinkedHashMap<>();
params.put("id", "joe");
params.put("name", "Joe");
params.put("age", 34);
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindValues(params);
或者您可以传入一个带有 Bean 属性或记录组件的参数对象:
// assuming id, name, age properties on Person
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindProperties(new Person("joe", "Joe", 34);
或者,您可以使用位置参数将值绑定到语句。 索引从零开始。
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind(0, "joe")
.bind(1, "Joe")
.bind(2, 34);
如果您的应用程序绑定到许多参数,可以通过一次调用实现相同的效果:
List<?> values = List.of("joe", "Joe", 34);
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindValues(values);
R2DBC 使用依赖于实际数据库供应商的数据库原生绑定标记。
例如,Postgres 使用索引标记,例如 $1
、$2
、$n
。
另一个例子是 SQL Server,它使用以 @
为前缀的命名绑定标记。
这与 JDBC 不同,JDBC 要求 ?
作为绑定标记。
在 JDBC 中,实际的驱动程序将 ?
绑定标记转换为数据库原生
标记,作为其语句执行的一部分。
Spring Framework 的 R2DBC 支持允许您使用原生绑定标记或使用 :name
语法的命名绑定
标记。
命名参数支持利用 BindMarkersFactory
实例在查询执行时将命名
参数扩展为原生绑定标记,这使您可以在各种数据库供应商之间实现
一定程度的查询可移植性。
查询预处理器将命名的 Collection
参数展开为一系列绑定
标记,从而无需根据参数数量动态创建查询。
嵌套对象数组被扩展以允许使用(例如)选择列表。
考虑以下查询:
SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))
上述查询可以参数化并按如下方式运行:
-
Java
-
Kotlin
List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann", 50});
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples);
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples)
选择列表的使用取决于供应商。 |
以下示例显示了使用 IN
谓词的更简单变体:
-
Java
-
Kotlin
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", Arrays.asList(35, 50));
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", arrayOf(35, 50))
R2DBC 本身不支持类似 Collection 的值。尽管如此,
在 Spring 的 R2DBC 支持中,扩展上述示例中给定的 |
语句过滤器
有时,您需要在实际 Statement
运行之前微调其选项。
为此,请向 DatabaseClient
注册一个 Statement
过滤器
(StatementFilterFunction
),以拦截和修改执行中的语句,
如下例所示:
-
Java
-
Kotlin
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
.bind("name", …)
.bind("state", …);
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
.bind("name", …)
.bind("state", …)
DatabaseClient
还公开了一个简化的 filter(…)
重载,它接受
一个 Function<Statement, Statement>
:
-
Java
-
Kotlin
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"));
client.sql("SELECT id, name, state FROM table")
.filter(statement -> s.fetchSize(25));
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { statement -> s.returnGeneratedValues("id") }
client.sql("SELECT id, name, state FROM table")
.filter { statement -> s.fetchSize(25) }
StatementFilterFunction
实现允许过滤 Statement
和过滤 Result
对象。
DatabaseClient
最佳实践
DatabaseClient
类的实例一旦配置好,就是线程安全的。这很重要,
因为它意味着您可以配置 DatabaseClient
的单个实例,然后安全地将此共享引用注入到多个 DAO(或存储库)中。
DatabaseClient
是有状态的,因为它维护对 ConnectionFactory
的引用,
但此状态不是会话状态。
使用 DatabaseClient
类时的常见做法是在 Spring 配置文件中配置 ConnectionFactory
,
然后将该共享 ConnectionFactory
Bean 依赖注入到 DAO 类中。DatabaseClient
在
ConnectionFactory
的设置器中创建。这导致 DAO 类似于以下内容:
-
Java
-
Kotlin
public class R2dbcCorporateEventDao implements CorporateEventDao {
private DatabaseClient databaseClient;
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory);
}
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {
private val databaseClient = DatabaseClient.create(connectionFactory)
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
除了显式配置之外,还可以使用组件扫描和注解
支持进行依赖注入。在这种情况下,您可以使用 @Component
(使其成为组件扫描的候选者)注解该类,并使用 @Autowired
注解 ConnectionFactory
设置器
方法。以下示例显示了如何执行此操作:
- Java
-
@Component [id="CO1-1"][id="CO1-1"][id="CO1-1"](1) public class R2dbcCorporateEventDao implements CorporateEventDao { private DatabaseClient databaseClient; @Autowired [id="CO1-2"][id="CO1-2"][id="CO1-2"](2) public void setConnectionFactory(ConnectionFactory connectionFactory) { this.databaseClient = DatabaseClient.create(connectionFactory); [id="CO1-3"][id="CO1-3"][id="CO1-3"](3) } // R2DBC-backed implementations of the methods on the CorporateEventDao follow... }
<1> 使用 `@Component` 注解该类。 <1> 使用 `@Autowired` 注解 `ConnectionFactory` 设置器方法。 <1> 使用 `ConnectionFactory` 创建一个新的 `DatabaseClient`。
- Kotlin
-
@Component [id="CO2-1"][id="CO1-4"][id="CO2-1"](1) class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { [id="CO2-2"][id="CO1-5"][id="CO2-2"](2) private val databaseClient = DatabaseClient(connectionFactory) [id="CO2-3"][id="CO1-6"][id="CO2-3"](3) // R2DBC-backed implementations of the methods on the CorporateEventDao follow... }
<1> 使用 `@Component` 注解该类。 <1> `ConnectionFactory` 的构造函数注入。 <1> 使用 `ConnectionFactory` 创建一个新的 `DatabaseClient`。
无论您选择使用(或不使用)上述哪种模板初始化样式,
每次要运行 SQL 时都很少需要创建 DatabaseClient
类的新实例。
一旦配置好,DatabaseClient
实例就是线程安全的。
如果您的应用程序访问多个
数据库,您可能需要多个 DatabaseClient
实例,这需要多个
ConnectionFactory
,以及随后多个不同配置的 DatabaseClient
实例。
检索自动生成的主键
INSERT
语句在将行插入定义了自增或标识列的表时可能会生成主键。
要完全控制要生成的列名,只需注册一个 StatementFilterFunction
,
该函数请求所需列的生成主键。
-
Java
-
Kotlin
Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"))
.map(row -> row.get("id", Integer.class))
.first();
// generatedId emits the generated key once the INSERT statement has finished
val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { statement -> s.returnGeneratedValues("id") }
.map { row -> row.get("id", Integer.class) }
.awaitOne()
// generatedId emits the generated key once the INSERT statement has finished
控制数据库连接
本节包括:
使用 ConnectionFactory
Spring 通过 ConnectionFactory
获取到数据库的 R2DBC 连接。
ConnectionFactory
是 R2DBC 规范的一部分,是驱动程序的通用入口点。
它允许容器或框架向应用程序代码隐藏连接池
和事务管理问题。作为开发人员,您不需要知道如何连接到数据库的细节。
这是设置 ConnectionFactory
的管理员的职责。您
在开发和测试代码时很可能同时扮演这两个角色,但您不
一定需要知道生产数据源是如何配置的。
当您使用 Spring 的 R2DBC 层时,您可以使用第三方提供的
连接池实现来配置您自己的连接池。一个流行的
实现是 R2DBC Pool (r2dbc-pool
)。Spring
发行版中的实现仅用于测试目的,不提供池化功能。
要配置 ConnectionFactory
:
-
像您通常获取 R2DBC
ConnectionFactory
一样,使用ConnectionFactory
获取连接。 -
提供一个 R2DBC URL (有关正确的值,请参阅您的驱动程序文档)。
以下示例显示了如何配置 ConnectionFactory
:
-
Java
-
Kotlin
ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
val factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
使用 ConnectionFactoryUtils
ConnectionFactoryUtils
类是一个方便且功能强大的辅助类,
它提供 static
方法来从 ConnectionFactory
获取连接并关闭连接(如果需要)。
它支持订阅者 Context
绑定连接,例如使用
R2dbcTransactionManager
。
使用 SingleConnectionFactory
SingleConnectionFactory
类是 DelegatingConnectionFactory
接口的一个实现,它包装了一个在每次使用后不会关闭的单个 Connection
。
如果任何客户端代码调用 close
,假设是池化连接(如使用
持久化工具时),则应将 suppressClose
属性设置为 true
。此设置
返回一个抑制关闭的代理,该代理包装了物理连接。请注意,您不能再将其
转换为原生 Connection
或类似对象。
SingleConnectionFactory
主要是一个测试类,如果您的 R2DBC 驱动程序允许此类使用,
则可用于特定要求,例如流水线。
与池化 ConnectionFactory
相反,它始终重用相同的连接,避免
过度创建物理连接。
使用 TransactionAwareConnectionFactoryProxy
TransactionAwareConnectionFactoryProxy
是目标 ConnectionFactory
的代理。
该代理包装了目标 ConnectionFactory
,以增加对 Spring 管理事务的感知。
如果您的 R2DBC 客户端未与 Spring 的 R2DBC 支持进行其他集成,
则必须使用此类。在这种情况下,您仍然可以使用此客户端,同时
让此客户端参与 Spring 管理的事务。通常
最好将 R2DBC 客户端与适当的 |
有关更多详细信息,请参阅 TransactionAwareConnectionFactoryProxy
javadoc。
使用 R2dbcTransactionManager
R2dbcTransactionManager
类是单个 R2DBC ConnectionFactory
的 ReactiveTransactionManager
实现。
它将指定 ConnectionFactory
的 R2DBC Connection
绑定到订阅者 Context
,
可能允许每个 ConnectionFactory
拥有一个订阅者 Connection
。
应用程序代码需要通过 ConnectionFactoryUtils.getConnection(ConnectionFactory)
而不是 R2DBC 的标准 ConnectionFactory.create()
来检索 R2DBC Connection
。
所有框架类(例如 DatabaseClient
)都隐式使用此策略。
如果未与事务管理器一起使用,查找策略的行为与 ConnectionFactory.create()
完全相同,
因此可以在任何情况下使用。