Coroutines
Kotlin 协程 是 Kotlin 轻量级线程,允许以命令式方式编写非阻塞代码。在语言方面,挂起函数提供了异步操作的抽象,而在库方面https://github.com/Kotlin/kotlinx.coroutines[kotlinx.coroutines] 提供了诸如https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/async.html[async { }
]和 Flow
之类的函数和类型。
Kotlin Coroutines are Kotlin
lightweight threads allowing to write non-blocking code in an imperative way. On language side,
suspending functions provides an abstraction for asynchronous operations while on library side
kotlinx.coroutines provides functions like
async { }
and types like Flow
.
Spring Framework 在以下范围内提供对协程的支持:
Spring Framework provides support for Coroutines on the following scope:
-
Deferred and Flow return values support in Spring MVC and WebFlux annotated
@Controller
-
Spring MVC 和 WebFlux 注释的
@Controller
中的挂起函数支持 -
Suspending function support in Spring MVC and WebFlux annotated
@Controller
-
WebFlux.fn coRouter { } DSL
-
WebFlux
CoWebFilter
-
RSocket 中的挂起函数和
Flow
支持@MessageMapping
注释的方法 -
Suspending function and
Flow
support in RSocket@MessageMapping
annotated methods -
Extensions for
RSocketRequester
-
Spring AOP
Dependencies
当类路径中存在 kotlinx-coroutines-core
和 kotlinx-coroutines-reactor
依赖项时,将启用协程支持:
Coroutines support is enabled when kotlinx-coroutines-core
and kotlinx-coroutines-reactor
dependencies are in the classpath:
build.gradle.kts
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}
支持版本 1.4.0
及更高版本。
Version 1.4.0
and above are supported.
How Reactive translates to Coroutines?
对于返回值,从 Reactive 到协程 API 的转换如下所示:
For return values, the translation from Reactive to Coroutines APIs is the following:
-
fun handler(): Mono<Void>
变成suspend fun handler()
-
fun handler(): Mono<Void>
becomessuspend fun handler()
-
fun handler(): Mono<T>
根据Mono
是否可以为空(以静态类型更强的优点)变成suspend fun handler(): T
或suspend fun handler(): T?
-
fun handler(): Mono<T>
becomessuspend fun handler(): T
orsuspend fun handler(): T?
depending on if theMono
can be empty or not (with the advantage of being more statically typed) -
fun handler(): Flux<T>
变成fun handler(): Flow<T>
-
fun handler(): Flux<T>
becomesfun handler(): Flow<T>
对于输入参数:
For input parameters:
-
如果不需要惰性,那么
fun handler(mono: Mono<T>)
就会变成fun handler(value: T)
,因为可以调用挂起函数来获取值参数。 -
If laziness is not needed,
fun handler(mono: Mono<T>)
becomesfun handler(value: T)
since a suspending functions can be invoked to get the value parameter. -
如果需要惰性,那么
fun handler(mono: Mono<T>)
就会变成fun handler(supplier: suspend () → T)
或fun handler(supplier: suspend () → T?)
-
If laziness is needed,
fun handler(mono: Mono<T>)
becomesfun handler(supplier: suspend () → T)
orfun handler(supplier: suspend () → T?)
Flow
是协程世界中相当于 Flux
,适用于热流或冷流、有限流或无限流,具有以下主要区别:
Flow
is Flux
equivalent in Coroutines world, suitable for hot or cold stream, finite or infinite streams, with the following main differences:
-
Flow
是面向推送的,而Flux
是面向推送-拉取的混合型 -
Flow
is push-based whileFlux
is push-pull hybrid -
反压通过挂起函数实现
-
Backpressure is implemented via suspending functions
-
Flow
只有一个 单一挂起collect
方法,并且将算子实现为 扩展 -
Flow
has only a single suspendingcollect
method and operators are implemented as extensions -
多亏了协程,才让 实现算子变得容易
-
Operators are easy to implement thanks to Coroutines
-
扩展允许向
Flow
添加自定义算子 -
Extensions allow to add custom operators to
Flow
-
收集操作是挂起函数
-
Collect operations are suspending functions
-
map
运算符 支持异步操作 (无需flatMap
),因为它使用挂起函数参数 -
map
operator supports asynchronous operation (no need forflatMap
) since it takes a suspending function parameter
请阅读关于 使用 Spring、协程和 Kotlin Flow 实现响应式编程的此博客文章以获取更多详细信息,包括如何通过协程同时运行代码。
Read this blog post about Going Reactive with Spring, Coroutines and Kotlin Flow for more details, including how to run code concurrently with Coroutines.
Controllers
以下是一个 Coroutines @RestController
示例。
Here is an example of a Coroutines @RestController
.
@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {
@GetMapping("/suspend")
suspend fun suspendingEndpoint(): Banner {
delay(10)
return banner
}
@GetMapping("/flow")
fun flowEndpoint() = flow {
delay(10)
emit(banner)
delay(10)
emit(banner)
}
@GetMapping("/deferred")
fun deferredEndpoint() = GlobalScope.async {
delay(10)
banner
}
@GetMapping("/sequential")
suspend fun sequential(): List<Banner> {
val banner1 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
val banner2 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
return listOf(banner1, banner2)
}
@GetMapping("/parallel")
suspend fun parallel(): List<Banner> = coroutineScope {
val deferredBanner1: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
val deferredBanner2: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
listOf(deferredBanner1.await(), deferredBanner2.await())
}
@GetMapping("/error")
suspend fun error() {
throw IllegalStateException()
}
@GetMapping("/cancel")
suspend fun cancel() {
throw CancellationException()
}
}
也支持使用 @Controller
进行视图渲染。
View rendering with a @Controller
is also supported.
@Controller
class CoroutinesViewController(banner: Banner) {
@GetMapping("/")
suspend fun render(model: Model): String {
delay(10)
model["banner"] = banner
return "index"
}
}
WebFlux.fn
以下是通过 coRouter { } DSL 和相关处理器定义的协程路由示例。
Here is an example of Coroutines router defined via the coRouter { } DSL and related handlers.
@Configuration
class RouterConfiguration {
@Bean
fun mainRouter(userHandler: UserHandler) = coRouter {
GET("/", userHandler::listView)
GET("/api/user", userHandler::listApi)
}
}
class UserHandler(builder: WebClient.Builder) {
private val client = builder.baseUrl("...").build()
suspend fun listView(request: ServerRequest): ServerResponse =
ServerResponse.ok().renderAndAwait("users", mapOf("users" to
client.get().uri("...").awaitExchange().awaitBody<User>()))
suspend fun listApi(request: ServerRequest): ServerResponse =
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
client.get().uri("...").awaitExchange().awaitBody<User>())
}
Transactions
Spring Framework 5.2 提供的反应式事务管理的编程变体支持对协程的事务。
Transactions on Coroutines are supported via the programmatic variant of the Reactive transaction management provided as of Spring Framework 5.2.
针对挂起函数,提供了一个 TransactionalOperator.executeAndAwait
扩展。
For suspending functions, a TransactionalOperator.executeAndAwait
extension is provided.
import org.springframework.transaction.reactive.executeAndAwait
class PersonRepository(private val operator: TransactionalOperator) {
suspend fun initDatabase() = operator.executeAndAwait {
insertPerson1()
insertPerson2()
}
private suspend fun insertPerson1() {
// INSERT SQL statement
}
private suspend fun insertPerson2() {
// INSERT SQL statement
}
}
针对 Kotlin Flow
,提供了一个 Flow<T>.transactional
扩展。
For Kotlin Flow
, a Flow<T>.transactional
extension is provided.
import org.springframework.transaction.reactive.transactional
class PersonRepository(private val operator: TransactionalOperator) {
fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)
private fun findPeople(): Flow<Person> {
// SELECT SQL statement
}
private suspend fun updatePerson(person: Person): Person {
// UPDATE SQL statement
}
}