Coroutines

Kotlin 协程 是 Kotlin 轻量级线程,允许以命令式方式编写非阻塞代码。在语言方面,挂起函数提供了异步操作的抽象,而在库方面https://github.com/Kotlin/kotlinx.coroutines[kotlinx.coroutines] 提供了诸如https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/async.html[async { }]和 Flow之类的函数和类型。

Kotlin Coroutines are Kotlin lightweight threads allowing to write non-blocking code in an imperative way. On language side, suspending functions provides an abstraction for asynchronous operations while on library side kotlinx.coroutines provides functions like async { } and types like Flow.

Spring Framework 在以下范围内提供对协程的支持:

Spring Framework provides support for Coroutines on the following scope:

  • Deferred and Flow return values support in Spring MVC and WebFlux annotated @Controller

  • Suspending function support in Spring MVC and WebFlux annotated @Controller

  • Extensions for WebFlux client and server functional API.

  • WebFlux.fn coRouter { } DSL

  • WebFlux CoWebFilter

  • Suspending function and Flow support in RSocket @MessageMapping annotated methods

  • Extensions for RSocketRequester

  • Spring AOP

Dependencies

当类路径中存在 kotlinx-coroutines-corekotlinx-coroutines-reactor 依赖项时,将启用协程支持:

Coroutines support is enabled when kotlinx-coroutines-core and kotlinx-coroutines-reactor dependencies are in the classpath:

build.gradle.kts

dependencies {

	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

支持版本 1.4.0 及更高版本。

Version 1.4.0 and above are supported.

How Reactive translates to Coroutines?

对于返回值,从 Reactive 到协程 API 的转换如下所示:

For return values, the translation from Reactive to Coroutines APIs is the following:

  • fun handler(): Mono<Void> becomes suspend fun handler()

  • fun handler(): Mono<T> becomes suspend fun handler(): T or suspend fun handler(): T? depending on if the Mono can be empty or not (with the advantage of being more statically typed)

  • fun handler(): Flux<T> becomes fun handler(): Flow<T>

对于输入参数:

For input parameters:

  • If laziness is not needed, fun handler(mono: Mono<T>) becomes fun handler(value: T) since a suspending functions can be invoked to get the value parameter.

  • If laziness is needed, fun handler(mono: Mono<T>) becomes fun handler(supplier: suspend () → T) or fun handler(supplier: suspend () → T?)

Flow 是协程世界中相当于 Flux,适用于热流或冷流、有限流或无限流,具有以下主要区别:

Flow is Flux equivalent in Coroutines world, suitable for hot or cold stream, finite or infinite streams, with the following main differences:

  • Flow is push-based while Flux is push-pull hybrid

  • Backpressure is implemented via suspending functions

  • Flow has only a single suspending collect method and operators are implemented as extensions

  • Operators are easy to implement thanks to Coroutines

  • Extensions allow to add custom operators to Flow

  • Collect operations are suspending functions

  • map operator supports asynchronous operation (no need for flatMap) since it takes a suspending function parameter

请阅读关于 使用 Spring、协程和 Kotlin Flow 实现响应式编程的此博客文章以获取更多详细信息,包括如何通过协程同时运行代码。

Read this blog post about Going Reactive with Spring, Coroutines and Kotlin Flow for more details, including how to run code concurrently with Coroutines.

Controllers

以下是一个 Coroutines @RestController 示例。

Here is an example of a Coroutines @RestController.

@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {

	@GetMapping("/suspend")
	suspend fun suspendingEndpoint(): Banner {
		delay(10)
		return banner
	}

	@GetMapping("/flow")
	fun flowEndpoint() = flow {
		delay(10)
		emit(banner)
		delay(10)
		emit(banner)
	}

	@GetMapping("/deferred")
	fun deferredEndpoint() = GlobalScope.async {
		delay(10)
		banner
	}

	@GetMapping("/sequential")
	suspend fun sequential(): List<Banner> {
		val banner1 = client
				.get()
				.uri("/suspend")
				.accept(MediaType.APPLICATION_JSON)
				.awaitExchange()
				.awaitBody<Banner>()
		val banner2 = client
				.get()
				.uri("/suspend")
				.accept(MediaType.APPLICATION_JSON)
				.awaitExchange()
				.awaitBody<Banner>()
		return listOf(banner1, banner2)
	}

	@GetMapping("/parallel")
	suspend fun parallel(): List<Banner> = coroutineScope {
		val deferredBanner1: Deferred<Banner> = async {
			client
					.get()
					.uri("/suspend")
					.accept(MediaType.APPLICATION_JSON)
					.awaitExchange()
					.awaitBody<Banner>()
		}
		val deferredBanner2: Deferred<Banner> = async {
			client
					.get()
					.uri("/suspend")
					.accept(MediaType.APPLICATION_JSON)
					.awaitExchange()
					.awaitBody<Banner>()
		}
		listOf(deferredBanner1.await(), deferredBanner2.await())
	}

	@GetMapping("/error")
	suspend fun error() {
		throw IllegalStateException()
	}

	@GetMapping("/cancel")
	suspend fun cancel() {
		throw CancellationException()
	}

}

也支持使用 @Controller 进行视图渲染。

View rendering with a @Controller is also supported.

@Controller
class CoroutinesViewController(banner: Banner) {

	@GetMapping("/")
	suspend fun render(model: Model): String {
		delay(10)
		model["banner"] = banner
		return "index"
	}
}

WebFlux.fn

以下是通过 coRouter { } DSL 和相关处理器定义的协程路由示例。

Here is an example of Coroutines router defined via the coRouter { } DSL and related handlers.

@Configuration
class RouterConfiguration {

	@Bean
	fun mainRouter(userHandler: UserHandler) = coRouter {
		GET("/", userHandler::listView)
		GET("/api/user", userHandler::listApi)
	}
}
class UserHandler(builder: WebClient.Builder) {

	private val client = builder.baseUrl("...").build()

	suspend fun listView(request: ServerRequest): ServerResponse =
			ServerResponse.ok().renderAndAwait("users", mapOf("users" to
			client.get().uri("...").awaitExchange().awaitBody<User>()))

	suspend fun listApi(request: ServerRequest): ServerResponse =
				ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
				client.get().uri("...").awaitExchange().awaitBody<User>())
}

Transactions

Spring Framework 5.2 提供的反应式事务管理的编程变体支持对协程的事务。

Transactions on Coroutines are supported via the programmatic variant of the Reactive transaction management provided as of Spring Framework 5.2.

针对挂起函数,提供了一个 TransactionalOperator.executeAndAwait 扩展。

For suspending functions, a TransactionalOperator.executeAndAwait extension is provided.

import org.springframework.transaction.reactive.executeAndAwait

class PersonRepository(private val operator: TransactionalOperator) {

    suspend fun initDatabase() = operator.executeAndAwait {
        insertPerson1()
        insertPerson2()
    }

    private suspend fun insertPerson1() {
        // INSERT SQL statement
    }

    private suspend fun insertPerson2() {
        // INSERT SQL statement
    }
}

针对 Kotlin Flow,提供了一个 Flow<T>.transactional 扩展。

For Kotlin Flow, a Flow<T>.transactional extension is provided.

import org.springframework.transaction.reactive.transactional

class PersonRepository(private val operator: TransactionalOperator) {

    fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)

    private fun findPeople(): Flow<Person> {
        // SELECT SQL statement
    }

    private suspend fun updatePerson(person: Person): Person {
        // UPDATE SQL statement
    }
}