Coroutines
Kotlin 协程 是 Kotlin 轻量级线程,允许以命令式方式编写非阻塞代码。在语言方面,挂起函数提供了异步操作的抽象,而在库方面https://github.com/Kotlin/kotlinx.coroutines[kotlinx.coroutines] 提供了诸如https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/async.html[async { }
]和 Flow
之类的函数和类型。
Kotlin Coroutines are Kotlin
lightweight threads allowing to write non-blocking code in an imperative way. On language side,
suspending functions provides an abstraction for asynchronous operations while on library side
kotlinx.coroutines provides functions like
async { }
and types like Flow
.
Spring Framework 在以下范围内提供对协程的支持:
Spring Framework provides support for Coroutines on the following scope:
-
Deferred and Flow return values support in Spring MVC and WebFlux annotated
@Controller
-
Suspending function support in Spring MVC and WebFlux annotated
@Controller
-
WebFlux.fn coRouter { } DSL
-
WebFlux
CoWebFilter
-
Suspending function and
Flow
support in RSocket@MessageMapping
annotated methods -
Extensions for
RSocketRequester
-
Spring AOP
Dependencies
当类路径中存在 kotlinx-coroutines-core
和 kotlinx-coroutines-reactor
依赖项时,将启用协程支持:
Coroutines support is enabled when kotlinx-coroutines-core
and kotlinx-coroutines-reactor
dependencies are in the classpath:
build.gradle.kts
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}
支持版本 1.4.0
及更高版本。
Version 1.4.0
and above are supported.
How Reactive translates to Coroutines?
对于返回值,从 Reactive 到协程 API 的转换如下所示:
For return values, the translation from Reactive to Coroutines APIs is the following:
-
fun handler(): Mono<Void>
becomessuspend fun handler()
-
fun handler(): Mono<T>
becomessuspend fun handler(): T
orsuspend fun handler(): T?
depending on if theMono
can be empty or not (with the advantage of being more statically typed) -
fun handler(): Flux<T>
becomesfun handler(): Flow<T>
对于输入参数:
For input parameters:
-
If laziness is not needed,
fun handler(mono: Mono<T>)
becomesfun handler(value: T)
since a suspending functions can be invoked to get the value parameter. -
If laziness is needed,
fun handler(mono: Mono<T>)
becomesfun handler(supplier: suspend () → T)
orfun handler(supplier: suspend () → T?)
Flow
是协程世界中相当于 Flux
,适用于热流或冷流、有限流或无限流,具有以下主要区别:
Flow
is Flux
equivalent in Coroutines world, suitable for hot or cold stream, finite or infinite streams, with the following main differences:
-
Flow
is push-based whileFlux
is push-pull hybrid -
Backpressure is implemented via suspending functions
-
Flow
has only a single suspendingcollect
method and operators are implemented as extensions -
Operators are easy to implement thanks to Coroutines
-
Extensions allow to add custom operators to
Flow
-
Collect operations are suspending functions
-
map
operator supports asynchronous operation (no need forflatMap
) since it takes a suspending function parameter
请阅读关于 使用 Spring、协程和 Kotlin Flow 实现响应式编程的此博客文章以获取更多详细信息,包括如何通过协程同时运行代码。
Read this blog post about Going Reactive with Spring, Coroutines and Kotlin Flow for more details, including how to run code concurrently with Coroutines.
Controllers
以下是一个 Coroutines @RestController
示例。
Here is an example of a Coroutines @RestController
.
@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {
@GetMapping("/suspend")
suspend fun suspendingEndpoint(): Banner {
delay(10)
return banner
}
@GetMapping("/flow")
fun flowEndpoint() = flow {
delay(10)
emit(banner)
delay(10)
emit(banner)
}
@GetMapping("/deferred")
fun deferredEndpoint() = GlobalScope.async {
delay(10)
banner
}
@GetMapping("/sequential")
suspend fun sequential(): List<Banner> {
val banner1 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
val banner2 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
return listOf(banner1, banner2)
}
@GetMapping("/parallel")
suspend fun parallel(): List<Banner> = coroutineScope {
val deferredBanner1: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
val deferredBanner2: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
listOf(deferredBanner1.await(), deferredBanner2.await())
}
@GetMapping("/error")
suspend fun error() {
throw IllegalStateException()
}
@GetMapping("/cancel")
suspend fun cancel() {
throw CancellationException()
}
}
也支持使用 @Controller
进行视图渲染。
View rendering with a @Controller
is also supported.
@Controller
class CoroutinesViewController(banner: Banner) {
@GetMapping("/")
suspend fun render(model: Model): String {
delay(10)
model["banner"] = banner
return "index"
}
}
WebFlux.fn
以下是通过 coRouter { } DSL 和相关处理器定义的协程路由示例。
Here is an example of Coroutines router defined via the coRouter { } DSL and related handlers.
@Configuration
class RouterConfiguration {
@Bean
fun mainRouter(userHandler: UserHandler) = coRouter {
GET("/", userHandler::listView)
GET("/api/user", userHandler::listApi)
}
}
class UserHandler(builder: WebClient.Builder) {
private val client = builder.baseUrl("...").build()
suspend fun listView(request: ServerRequest): ServerResponse =
ServerResponse.ok().renderAndAwait("users", mapOf("users" to
client.get().uri("...").awaitExchange().awaitBody<User>()))
suspend fun listApi(request: ServerRequest): ServerResponse =
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
client.get().uri("...").awaitExchange().awaitBody<User>())
}
Transactions
Spring Framework 5.2 提供的反应式事务管理的编程变体支持对协程的事务。
Transactions on Coroutines are supported via the programmatic variant of the Reactive transaction management provided as of Spring Framework 5.2.
针对挂起函数,提供了一个 TransactionalOperator.executeAndAwait
扩展。
For suspending functions, a TransactionalOperator.executeAndAwait
extension is provided.
import org.springframework.transaction.reactive.executeAndAwait
class PersonRepository(private val operator: TransactionalOperator) {
suspend fun initDatabase() = operator.executeAndAwait {
insertPerson1()
insertPerson2()
}
private suspend fun insertPerson1() {
// INSERT SQL statement
}
private suspend fun insertPerson2() {
// INSERT SQL statement
}
}
针对 Kotlin Flow
,提供了一个 Flow<T>.transactional
扩展。
For Kotlin Flow
, a Flow<T>.transactional
extension is provided.
import org.springframework.transaction.reactive.transactional
class PersonRepository(private val operator: TransactionalOperator) {
fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)
private fun findPeople(): Flow<Person> {
// SELECT SQL statement
}
private suspend fun updatePerson(person: Person): Person {
// UPDATE SQL statement
}
}