WebSockets

本参考文档的这一部分介绍了对响应式栈 WebSocket 消息的支持。 Unresolved directive in webflux-websocket.adoc - include::partial$web/websocket-intro.adoc[leveloffset=+1]

WebSocket API

Spring Framework 提供了一个 WebSocket API,您可以使用它来编写处理 WebSocket 消息的客户端和服务器端应用程序。

服务器

要创建 WebSocket 服务器,您可以首先创建一个 WebSocketHandler。以下示例展示了如何实现:

  • Java

  • Kotlin

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// ...
	}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		// ...
	}
}

然后您可以将其映射到一个 URL:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public HandlerMapping handlerMapping() {
		Map<String, WebSocketHandler> map = new HashMap<>();
		map.put("/path", new MyWebSocketHandler());
		int order = -1; // before annotated controllers

		return new SimpleUrlHandlerMapping(map, order);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerMapping(): HandlerMapping {
		val map = mapOf("/path" to MyWebSocketHandler())
		val order = -1 // before annotated controllers

		return SimpleUrlHandlerMapping(map, order)
	}
}

如果使用 WebFlux 配置,则无需进一步操作;否则,如果未使用 WebFlux 配置,您将需要声明一个 WebSocketHandlerAdapter,如下所示:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	// ...

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}
@Configuration
class WebConfig {

	// ...

	@Bean
	fun handlerAdapter() =  WebSocketHandlerAdapter()
}

WebSocketHandler

WebSocketHandlerhandle 方法接收 WebSocketSession 并返回 Mono<Void>,以指示会话的应用处理何时完成。会话通过两个流进行处理,一个用于入站消息,一个用于出站消息。下表描述了处理这些流的两种方法:

WebSocketSession 方法 描述

Flux<WebSocketMessage> receive()

提供对入站消息流的访问,并在连接关闭时完成。

Mono<Void> send(Publisher<WebSocketMessage>)

接收一个用于出站消息的源,写入消息,并返回一个 Mono<Void>,该 Mono<Void> 在源完成且写入完成后完成。

WebSocketHandler 必须将入站和出站流组合成一个统一的流,并返回一个反映该流完成的 Mono<Void>。根据应用程序要求,统一流在以下情况之一完成:

  • 入站或出站消息流完成。

  • 入站流完成(即连接关闭),而出站流是无限的。

  • 在选定的点,通过 WebSocketSessionclose 方法。

当入站和出站消息流组合在一起时,无需检查连接是否打开,因为 Reactive Streams 信号会结束活动。入站流接收完成或错误信号,出站流接收取消信号。

处理程序最基本的实现是处理入站流的实现。以下示例展示了这样的实现:

Java
class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		return session.receive()			[id="CO1-1"][id="CO1-1"][id="CO1-1"](1)
				.doOnNext(message -> {
					// ...					[id="CO1-2"][id="CO1-2"][id="CO1-2"](2)
				})
				.concatMap(message -> {
					// ...					[id="CO1-3"][id="CO1-3"][id="CO1-3"](3)
				})
				.then();					[id="CO1-4"][id="CO1-4"][id="CO1-4"](4)
	}
}
<1>  访问入站消息流。
<1>  对每条消息执行操作。
<1>  执行使用消息内容的嵌套异步操作。
<1>  返回一个 `Mono<Void>`,该 `Mono<Void>` 在接收完成时完成。
Kotlin
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		return session.receive()			[id="CO2-1"][id="CO1-5"][id="CO2-1"](1)
				.doOnNext {
					// ...					[id="CO2-2"][id="CO1-6"][id="CO2-2"](2)
				}
				.concatMap {
					// ...					[id="CO2-3"][id="CO1-7"][id="CO2-3"](3)
				}
				.then()						[id="CO2-4"][id="CO1-8"][id="CO2-4"](4)
	}
}
<1>  访问入站消息流。
<1>  对每条消息执行操作。
<1>  执行使用消息内容的嵌套异步操作。
<1>  返回一个 `Mono<Void>`,该 `Mono<Void>` 在接收完成时完成。

对于嵌套的异步操作,您可能需要在使用池化数据缓冲区的底层服务器(例如 Netty)上调用 message.retain()。否则,数据缓冲区可能会在您有机会读取数据之前被释放。有关更多背景信息,请参阅 数据缓冲区和编解码器

以下实现结合了入站和出站流:

Java
class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Flux<WebSocketMessage> output = session.receive()				[id="CO3-1"][id="CO1-9"][id="CO3-1"](1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.map(value -> session.textMessage("Echo " + value));	[id="CO3-2"][id="CO1-10"][id="CO3-2"](2)

		return session.send(output);									[id="CO3-3"][id="CO1-11"][id="CO3-3"](3)
	}
}
<1>  处理入站消息流。
<1>  创建出站消息,生成组合流。
<1>  返回一个 `Mono<Void>`,该 `Mono<Void>` 在我们继续接收时不会完成。
Kotlin
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val output = session.receive()						[id="CO4-1"][id="CO1-12"][id="CO4-1"](1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.map { session.textMessage("Echo $it") }	[id="CO4-2"][id="CO1-13"][id="CO4-2"](2)

		return session.send(output)							[id="CO4-3"][id="CO1-14"][id="CO4-3"](3)
	}
}
<1>  处理入站消息流。
<1>  创建出站消息,生成组合流。
<1>  返回一个 `Mono<Void>`,该 `Mono<Void>` 在我们继续接收时不会完成。

入站和出站流可以相互独立,仅在完成时才连接,如下例所示:

Java
class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()								[id="CO5-1"]1
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.then();

		Flux<String> source = ... ;
		Mono<Void> output = session.send(source.map(session::textMessage));	[id="CO5-2"]2

		return Mono.zip(input, output).then();								[id="CO5-3"]3
	}
}
<1>  处理入站消息流。
<1>  发送出站消息。
<1>  连接流并返回一个 `Mono<Void>`,该 `Mono<Void>` 在任一流结束时完成。
Kotlin
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val input = session.receive()									[id="CO6-1"][id="CO1-15"][id="CO6-1"](1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.then()

		val source: Flux<String> = ...
		val output = session.send(source.map(session::textMessage))		[id="CO6-2"][id="CO1-16"][id="CO6-2"](2)

		return Mono.zip(input, output).then()							[id="CO6-3"][id="CO1-17"][id="CO6-3"](3)
	}
}
<1>  处理入站消息流。
<1>  发送出站消息。
<1>  连接流并返回一个 `Mono<Void>`,该 `Mono<Void>` 在任一流结束时完成。

DataBuffer

DataBuffer 是 WebFlux 中字节缓冲区的表示。参考文档的 Spring Core 部分在 数据缓冲区和编解码器 一节中对此有更多介绍。需要理解的关键点是,在某些服务器(如 Netty)上,字节缓冲区是池化和引用计数的,并且在消耗后必须释放,以避免内存泄漏。

在 Netty 上运行时,如果应用程序希望保留输入数据缓冲区以确保它们不会被释放,则必须使用 DataBufferUtils.retain(dataBuffer),然后在缓冲区被消耗时使用 DataBufferUtils.release(dataBuffer)

握手

WebSocketHandlerAdapter 委托给 WebSocketService。默认情况下,它是一个 HandshakeWebSocketService 实例,该实例对 WebSocket 请求执行基本检查,然后使用正在使用的服务器的 RequestUpgradeStrategy。目前,已内置支持 Reactor Netty、Tomcat、Jetty 和 Undertow。

HandshakeWebSocketService 公开了一个 sessionAttributePredicate 属性,该属性允许设置一个 Predicate<String> 来从 WebSession 中提取属性并将其插入到 WebSocketSession 的属性中。

服务器配置

每个服务器的 RequestUpgradeStrategy 都公开了特定于底层 WebSocket 服务器引擎的配置。使用 WebFlux Java 配置时,您可以自定义此类属性,如 WebFlux 配置 的相应部分所示;否则,如果未使用 WebFlux 配置,请使用以下方法:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter(webSocketService());
	}

	@Bean
	public WebSocketService webSocketService() {
		TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
		strategy.setMaxSessionIdleTimeout(0L);
		return new HandshakeWebSocketService(strategy);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerAdapter() =
			WebSocketHandlerAdapter(webSocketService())

	@Bean
	fun webSocketService(): WebSocketService {
		val strategy = TomcatRequestUpgradeStrategy().apply {
			setMaxSessionIdleTimeout(0L)
		}
		return HandshakeWebSocketService(strategy)
	}
}

检查您服务器的升级策略以查看可用的选项。目前,只有 Tomcat 和 Jetty 公开了此类选项。

CORS

配置 CORS 并限制对 WebSocket 端点的访问的最简单方法是让您的 WebSocketHandler 实现 CorsConfigurationSource 并返回一个包含允许的源、标头和其他详细信息的 CorsConfiguration。如果您无法这样做,您还可以在 SimpleUrlHandler 上设置 corsConfigurations 属性,以按 URL 模式指定 CORS 设置。如果两者都指定,它们将通过 CorsConfiguration 上的 combine 方法进行组合。

客户端

Spring WebFlux 提供了一个 WebSocketClient 抽象,其实现包括 Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)。

Tomcat 客户端实际上是标准 Java 客户端的扩展,在 WebSocketSession 处理中具有一些额外的功能,以利用 Tomcat 特定的 API 来暂停接收消息以进行背压。

要启动 WebSocket 会话,您可以创建客户端实例并使用其 execute 方法:

  • Java

  • Kotlin

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
		session.receive()
				.doOnNext(System.out::println)
				.then());
val client = ReactorNettyWebSocketClient()

		val url = URI("ws://localhost:8080/path")
		client.execute(url) { session ->
			session.receive()
					.doOnNext(::println)
			.then()
		}

一些客户端,例如 Jetty,实现了 Lifecycle,在使用之前需要停止和启动。所有客户端都有与底层 WebSocket 客户端配置相关的构造函数选项。