WebSockets 支持
从 4.1 版本开始,Spring Integration 支持 WebSocket。
它基于 Spring Framework 的 web-socket
模块的架构、基础设施和 API。
因此,Spring WebSocket 的许多组件(例如 SubProtocolHandler
或 WebSocketClient
)和配置选项(例如 @EnableWebSocketMessageBroker
)可以在 Spring Integration 中重用。
有关更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework WebSocket 支持 章节。
您需要将此依赖项添加到您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-websocket</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:{project-version}"
对于服务器端,必须显式包含 org.springframework:spring-webmvc
依赖项。
Spring Framework WebSocket 基础设施基于 Spring 消息传递基础,并提供了一个基于 Spring Integration 使用的相同 MessageChannel
实现和 MessageHandler
实现(以及一些 POJO 方法注解映射)的基本消息传递框架。
因此,Spring Integration 可以直接参与 WebSocket 流,即使没有 WebSocket 适配器。
为此,您可以配置一个带有适当注解的 Spring Integration @MessagingGateway
,如下例所示:
@MessagingGateway
@Controller
public interface WebSocketGateway {
@MessageMapping("/greeting")
@SendToUser("/queue/answer")
@Gateway(requestChannel = "greetingChannel")
String greeting(String payload);
}
概述
由于 WebSocket 协议本质上是流式的,并且我们可以同时向 WebSocket 发送和接收消息,因此我们可以处理适当的 WebSocketSession
,无论是在客户端还是服务器端。
为了封装连接管理和 WebSocketSession
注册,提供了 IntegrationWebSocketContainer
,其中包含 ClientWebSocketContainer
和 ServerWebSocketContainer
实现。
得益于 WebSocket API 及其在 Spring Framework 中的实现(具有许多扩展),服务器端和客户端都使用相同的类(当然,从 Java 的角度来看)。
因此,大多数连接和 WebSocketSession
注册选项在两端都是相同的。
这使我们能够重用许多配置项和基础设施钩子来在服务器端和客户端构建 WebSocket 应用程序。
以下示例展示了组件如何同时服务于这两种目的:
//Client side
@Bean
public WebSocketClient webSocketClient() {
return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}
@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}
//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/endpoint").withSockJs();
}
IntegrationWebSocketContainer
旨在实现双向消息传递,并且可以在入站和出站通道适配器(见下文)之间共享,在使用单向(发送或接收)WebSocket 消息传递时,可以仅从其中一个引用它。
它可以在没有任何通道适配器的情况下使用,但在这种情况下,IntegrationWebSocketContainer
仅充当 WebSocketSession
注册表。
|
从 6.1 版本开始,ClientWebSocketContainer
可以使用提供的 URI
进行配置,而不是 uriTemplate
和 uriVariables
的组合。
这在 URI 的某些部分需要自定义编码的情况下非常有用。
请参阅 UriComponentsBuilder
API 以方便使用。
WebSocket 入站通道适配器
WebSocketInboundChannelAdapter
实现了 WebSocketSession
交互的接收部分。
您必须为其提供一个 IntegrationWebSocketContainer
,并且适配器将自身注册为 WebSocketListener
以处理传入消息和 WebSocketSession
事件。
在 |
对于 WebSocket 子协议,WebSocketInboundChannelAdapter
可以配置 SubProtocolHandlerRegistry
作为第二个构造函数参数。
适配器委托给 SubProtocolHandlerRegistry
以确定接受的 WebSocketSession
的适当 SubProtocolHandler
,并根据子协议实现将 WebSocketMessage
转换为 Message
。
默认情况下, |
WebSocketInboundChannelAdapter
仅接受并发送到底层集成流的 Message
实例,这些实例具有 SimpMessageType.MESSAGE
或空的 simpMessageType
头。
所有其他 Message
类型都通过从 SubProtocolHandler
实现(例如 StompSubProtocolHandler
)发出的 ApplicationEvent
实例处理。
在服务器端,如果存在 @EnableWebSocketMessageBroker
配置,您可以配置 WebSocketInboundChannelAdapter
并使用 useBroker = true
选项。
在这种情况下,所有 non-MESSAGE
Message
类型都委托给提供的 AbstractBrokerMessageHandler
。
此外,如果代理中继配置了目标前缀,则与代理目标匹配的消息将路由到 AbstractBrokerMessageHandler
,而不是 WebSocketInboundChannelAdapter
的 outputChannel
。
如果 useBroker = false
并且收到的消息是 SimpMessageType.CONNECT
类型,WebSocketInboundChannelAdapter
会立即向 WebSocketSession
发送 SimpMessageType.CONNECT_ACK
消息,而不会将其发送到通道。
Spring 的 WebSocket 支持允许只配置一个代理中继。
因此,我们不需要 |
有关更多配置选项,请参阅 web-sockets-namespace。
WebSocket 出站通道适配器
WebSocketOutboundChannelAdapter
:
-
接受来自其
MessageChannel
的 Spring Integration 消息。 -
从
MessageHeaders
中确定WebSocketSession
的id
。 -
从提供的
IntegrationWebSocketContainer
中检索WebSocketSession
。 -
将
WebSocketMessage
的转换和发送工作委托给提供的SubProtocolHandlerRegistry
中的相应SubProtocolHandler
。
在客户端,WebSocketSession
id
消息头不是必需的,因为 ClientWebSocketContainer
只处理单个连接及其 WebSocketSession
。
要使用 STOMP 子协议,您应该使用 StompSubProtocolHandler
配置此适配器。
然后,您可以使用 StompHeaderAccessor.create(StompCommand…)
和 MessageBuilder
,或者只使用 HeaderEnricher
(请参阅 Header Enricher),向此适配器发送任何 STOMP 消息类型。
本章的其余部分主要介绍附加配置选项。
WebSockets 命名空间支持
Spring Integration WebSocket 命名空间包含本章其余部分描述的几个组件。 要将其包含在您的配置中,请在您的应用程序上下文配置文件中使用以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/websocket
https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
...
</beans>
<int-websocket:client-container>
属性
以下列表显示了 <int-websocket:client-container>
元素可用的属性:
<int-websocket:client-container
id="" [id="CO1-1"]1
client="" [id="CO1-2"]2
uri="" [id="CO1-3"]3
uri-variables="" [id="CO1-4"]4
origin="" [id="CO1-5"]5
send-time-limit="" [id="CO1-6"]6
send-buffer-size-limit="" [id="CO1-7"]7
send-buffer-overflow-strategy="" [id="CO1-8"]8
auto-startup="" [id="CO1-9"]9
phase=""> [id="CO1-10"]10
<int-websocket:http-headers>
<entry key="" value=""/>
</int-websocket:http-headers> [id="CO1-11"]11
</int-websocket:client-container>
<1> 组件 bean 名称。 <1> `WebSocketClient` bean 引用。 <1> 目标 WebSocket 服务的 `uri` 或 `uriTemplate`。 如果您将其用作带有 URI 变量占位符的 `uriTemplate`,则 `uri-variables` 属性是必需的。 <1> `uri` 属性值中 URI 变量占位符的逗号分隔值。 这些值根据它们在 `uri` 中的顺序替换到占位符中。 请参阅 link:https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/util/UriComponents.html#expand-java.lang.Object[`UriComponents.expand(Object…uriVariableValues)`]。 <1> `Origin` 握手 HTTP 头值。 <1> WebSocket 会话“发送”超时限制。 默认为 `10000`。 <1> WebSocket 会话“发送”消息大小限制。 默认为 `524288`。 <1> WebSocket 会话发送缓冲区溢出策略, 用于确定当会话的出站消息缓冲区达到 `send-buffer-size-limit` 时应采取的行为。 有关可能的值和更多详细信息,请参阅 `ConcurrentWebSocketSessionDecorator.OverflowStrategy`。 <1> 布尔值,指示此端点是否应自动启动。 默认为 `false`,假定此容器是从 <<WebSocket inbound adapter,web-socket-inbound-adapter>> 启动的。 <1> 此端点应启动和停止的生命周期阶段。 值越低,此端点启动越早,停止越晚。 默认值为 `Integer.MAX_VALUE`。 值可以是负数。 请参阅 link:https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/context/SmartLifecycle.html[`SmartLifeCycle`]。 <1> 与握手请求一起使用的 `HttpHeaders` 的 `Map`。
<int-websocket:server-container>
属性
以下列表显示了 <int-websocket:server-container>
元素可用的属性:
<int-websocket:server-container
id="" [id="CO2-1"]1
path="" [id="CO2-2"]2
handshake-handler="" [id="CO2-3"]3
handshake-interceptors="" [id="CO2-4"]4
decorator-factories="" [id="CO2-5"]5
send-time-limit="" [id="CO2-6"]6
send-buffer-size-limit="" [id="CO2-7"]7
send-buffer-overflow-strategy="" [id="CO2-8"]8
allowed-origins=""> [id="CO2-9"]9
<int-websocket:sockjs
client-library-url="" [id="CO2-10"]10
stream-bytes-limit="" [id="CO2-11"]11
session-cookie-needed="" [id="CO2-12"]12
heartbeat-time="" [id="CO2-13"]13
disconnect-delay="" [id="CO2-14"]14
message-cache-size="" [id="CO2-15"]15
websocket-enabled="" [id="CO2-16"]16
scheduler="" [id="CO2-17"]17
message-codec="" [id="CO2-18"]18
transport-handlers="" [id="CO2-19"]19
suppress-cors="true" /> [id="CO2-20"]20
</int-websocket:server-container>
<1> 组件 bean 名称。 <1> 将特定请求映射到 `WebSocketHandler` 的路径(或逗号分隔的路径)。 支持精确路径映射 URI(例如 `/myPath`)和 ant 样式路径模式(例如 `/myPath/**`)。 <1> `HandshakeHandler` bean 引用。 默认为 `DefaultHandshakeHandler`。 <1> `HandshakeInterceptor` bean 引用的列表。 <1> 一个或多个工厂 (`WebSocketHandlerDecoratorFactory`) 的列表,这些工厂装饰用于处理 WebSocket 消息的处理程序。 这对于某些高级用例可能很有用(例如,允许 Spring Security 在相应的 HTTP 会话过期时强制关闭 WebSocket 会话)。 有关更多信息,请参阅 link:https://docs.spring.io/spring-session/docs/current/reference/html5/#websocket[Spring Session Project]。 <1> 请参阅 <<`<int-websocket:client-container>`,websocket-client-container-attributes>> 上的相同选项。 <1> 请参阅 <<`<int-websocket:client-container>`,websocket-client-container-attributes>> 上的相同选项。 <1> WebSocket 会话发送缓冲区溢出策略, 用于确定当会话的出站消息缓冲区达到 `send-buffer-size-limit` 时应采取的行为。 有关可能的值和更多详细信息,请参阅 `ConcurrentWebSocketSessionDecorator.OverflowStrategy`。 <1> 允许的 origin 头值。 您可以将多个 origin 指定为逗号分隔列表。 此检查主要设计用于浏览器客户端。 其他类型的客户端无法修改 origin 头值。 当启用 SockJS 并且限制了允许的 origin 时,不使用 origin 头进行跨域请求的传输类型(`jsonp-polling`、`iframe-xhr-polling`、`iframe-eventsource` 和 `iframe-htmlfile`)将被禁用。 因此,不支持 IE6 和 IE7,IE8 和 IE9 仅在没有 cookie 的情况下支持。 默认情况下,允许所有 origin。 <1> 没有原生跨域通信的传输(例如 `eventsource` 和 `htmlfile`)必须从“`foreign`”域获取一个简单页面,放在一个不可见的 iframe 中,以便 iframe 中的代码可以从 SockJS 服务器的本地域运行。 由于 iframe 需要加载 SockJS javascript 客户端库,此属性允许您指定从何处加载它。 默认情况下,它指向 `https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js`。 但是,您也可以将其设置为指向应用程序提供的 URL。 请注意,可以指定相对 URL,在这种情况下,URL 必须相对于 iframe URL。 例如,假设 SockJS 端点映射到 `/sockjs` 并且生成的 iframe URL 是 `/sockjs/iframe.html`,则相对 URL 必须以 "../../" 开头才能遍历到 SockJS 映射上方的位置。 对于基于前缀的 servlet 映射,您可能需要多一个遍历。 <1> 在关闭之前,可以通过单个 HTTP 流请求发送的最小字节数。 默认为 `128K`(即 128*1024 或 131072 字节)。 <1> SockJs `/info` 端点响应中的 `cookie_needed` 值。 此属性指示应用程序是否需要 `JSESSIONID` cookie 才能正常运行(例如,用于负载均衡或 Java Servlet 容器中 HTTP 会话的使用)。 <1> 服务器未发送任何消息后,服务器应向客户端发送心跳帧以保持连接不中断的时间(以毫秒为单位)。 默认值为 `25,000`(25 秒)。 <1> 客户端在没有接收连接(即服务器可以向客户端发送数据的活动连接)后,被视为断开连接的时间(以毫秒为单位)。 默认值为 `5000`。 <1> 会话在等待客户端的下一个 HTTP 轮询请求时可以缓存的服务器到客户端消息的数量。 默认大小为 `100`。 <1> 某些负载均衡器不支持 WebSockets。 将此选项设置为 `false` 以在服务器端禁用 WebSocket 传输。 默认值为 `true`。 <1> `TaskScheduler` bean 引用。 如果未提供值,则会创建一个新的 `ThreadPoolTaskScheduler` 实例。 此调度器实例用于调度心跳消息。 <1> 用于编码和解码 SockJS 消息的 `SockJsMessageCodec` bean 引用。 默认情况下,使用 `Jackson2SockJsMessageCodec`,这需要类路径上存在 Jackson 库。 <1> `TransportHandler` bean 引用的列表。 <1> 是否禁用 SockJS 请求自动添加 CORS 头。 默认值为 `false`。
<int-websocket:outbound-channel-adapter>
属性
以下列表显示了 <int-websocket:outbound-channel-adapter>
元素可用的属性:
<int-websocket:outbound-channel-adapter
id="" [id="CO3-1"]1
channel="" [id="CO3-2"]2
container="" [id="CO3-3"]3
default-protocol-handler="" [id="CO3-4"]4
protocol-handlers="" [id="CO3-5"]5
message-converters="" [id="CO3-6"]6
merge-with-default-converters="" [id="CO3-7"]7
auto-startup="" [id="CO3-8"]8
phase=""/> [id="CO3-9"]9
<1> 组件 bean 名称。 如果您不提供 `channel` 属性,则会创建一个 `DirectChannel`,并以此 `id` 属性作为 bean 名称注册到应用程序上下文中。 在这种情况下,端点将以 bean 名称 `id` 加上 `.adapter` 注册。 `MessageHandler` 将以 bean 别名 `id` 加上 `.handler` 注册。 <1> 标识附加到此适配器的通道。 <1> `IntegrationWebSocketContainer` bean 的引用,它封装了低级连接和 `WebSocketSession` 处理操作。 必需。 <1> `SubProtocolHandler` 实例的可选引用。 当客户端未请求子协议或它是单个协议处理程序时使用。 如果未提供此引用或 `protocol-handlers` 列表,则默认使用 `PassThruSubProtocolHandler`。 <1> 此通道适配器的 `SubProtocolHandler` bean 引用列表。 如果您只提供一个 bean 引用并且不提供 `default-protocol-handler`,则该单个 `SubProtocolHandler` 将用作 `default-protocol-handler`。 如果您不设置此属性或 `default-protocol-handler`,则默认使用 `PassThruSubProtocolHandler`。 <1> 此通道适配器的 `MessageConverter` bean 引用列表。 <1> 布尔值,指示在任何自定义转换器之后是否应注册默认转换器。 此标志仅在提供了 `message-converters` 时使用。 否则,所有默认转换器都会注册。 默认为 `false`。 默认转换器(按顺序)是:`StringMessageConverter`、`ByteArrayMessageConverter` 和 `MappingJackson2MessageConverter`(如果类路径上存在 Jackson 库)。 <1> 布尔值,指示此端点是否应自动启动。 默认为 `true`。 <1> 此端点应启动和停止的生命周期阶段。 值越低,此端点启动越早,停止越晚。 默认值为 `Integer.MIN_VALUE`。 值可以是负数。 请参阅 link:https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/context/SmartLifecycle.html[`SmartLifeCycle`]。
<int-websocket:inbound-channel-adapter>
属性
以下列表显示了 <int-websocket:outbound-channel-adapter>
元素可用的属性:
<int-websocket:inbound-channel-adapter
id="" [id="CO4-1"]1
channel="" [id="CO4-2"]2
error-channel="" [id="CO4-3"]3
container="" [id="CO4-4"]4
default-protocol-handler="" [id="CO4-5"]5
protocol-handlers="" [id="CO4-6"]6
message-converters="" [id="CO4-7"]7
merge-with-default-converters="" [id="CO4-8"]8
send-timeout="" [id="CO4-9"]9
payload-type="" [id="CO4-10"]10
use-broker="" [id="CO4-11"]11
auto-startup="" [id="CO4-12"]12
phase=""/> [id="CO4-13"]13
<1> 组件 bean 名称。 如果您不设置 `channel` 属性,则会创建一个 `DirectChannel`,并以此 `id` 属性作为 bean 名称注册到应用程序上下文中。 在这种情况下,端点将以 bean 名称 `id` 加上 `.adapter` 注册。 <1> 标识附加到此适配器的通道。 <1> `MessageChannel` bean 引用,`ErrorMessage` 实例应发送到该引用。 <1> 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。 <1> 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。 <1> 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。 <1> 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。 <1> 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。 <1> 如果通道可以阻塞,则在向通道发送消息时等待的最大时间(以毫秒为单位)。 例如,如果 `QueueChannel` 已达到其最大容量,则它可能会阻塞直到有可用空间。 <1> 目标 `payload` 的 Java 类型的完全限定名称,用于从传入的 `WebSocketMessage` 进行转换。 默认为 `java.lang.String`。 <1> 指示此适配器是否将 `non-MESSAGE` `WebSocketMessage` 实例和带有代理目标的 संदेश发送到应用程序上下文中的 `AbstractBrokerMessageHandler`。 当此属性为 `true` 时,需要 `Broker Relay` 配置。 此属性仅在服务器端使用。 在客户端,它被忽略。 默认为 `false`。 <1> 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。 <1> 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。
使用 ClientStompEncoder
从 4.3.13 版本开始,Spring Integration 提供了 ClientStompEncoder
(作为标准 StompEncoder
的扩展)用于 WebSocket 通道适配器的客户端。
为了正确准备客户端消息,您必须将 ClientStompEncoder
实例注入到 StompSubProtocolHandler
中。
默认 StompSubProtocolHandler
的一个问题是它设计用于服务器端,因此它将 SEND
stompCommand
头更新为 MESSAGE
(STOMP 协议对服务器端的要求)。
如果客户端未以正确的 SEND
web socket 帧发送其消息,某些 STOMP 代理将不接受它们。
在这种情况下,ClientStompEncoder
的目的是覆盖 stompCommand
头并将其设置为 SEND
值,然后将消息编码为 byte[]
。
动态 WebSocket 端点注册
从 5.5 版本开始,WebSocket 服务器端点(基于 ServerWebSocketContainer
的通道适配器)现在可以在运行时注册(和移除)—— ServerWebSocketContainer
映射的 paths
通过 HandlerMapping
暴露到 DispatcherServlet
中,并可供 WebSocket 客户端访问。
动态和运行时集成流 支持以透明方式注册这些端点:
@Autowired
IntegrationFlowContext integrationFlowContext;
@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
new ServerWebSocketContainer("/dynamic")
.setHandshakeHandler(this.handshakeHandler);
WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
new WebSocketInboundChannelAdapter(serverWebSocketContainer);
QueueChannel dynamicRequestsChannel = new QueueChannel();
IntegrationFlow serverFlow =
IntegrationFlow.from(webSocketInboundChannelAdapter)
.channel(dynamicRequestsChannel)
.get();
IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
this.integrationFlowContext.registration(serverFlow)
.addBean(serverWebSocketContainer)
.register();
...
dynamicServerFlow.destroy();
在动态流注册上调用 |
动态 Websocket 端点只能通过 Spring Integration 机制注册:当使用常规 Spring @EnableWebsocket
时,Spring Integration 配置会回退,并且不会注册动态端点所需的基础设施。