STOMP 支持
Spring Integration 4.2 版本引入了 STOMP(Simple Text Orientated Messaging Protocol,简单文本面向消息协议)客户端支持。它基于 Spring Framework 消息模块 stomp
包的架构、基础设施和 API。Spring Integration 使用了许多 Spring STOMP 组件(例如 StompSession
和 StompClientSupport
)。更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework STOMP 支持章节。你需要将此依赖项添加到你的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stomp</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:{project-version}"
对于服务器端组件,你需要添加 org.springframework:spring-websocket
和/或 io.projectreactor.netty:reactor-netty
依赖项。
概述
要配置 STOMP,你应该从 STOMP 客户端对象开始。Spring Framework 提供了以下实现:
-
WebSocketStompClient
: 基于 Spring WebSocket API 构建,支持标准 JSR-356 WebSocket、Jetty 9 和 SockJS,用于基于 HTTP 的 WebSocket 仿真,带有 SockJS 客户端。 -
ReactorNettyTcpStompClient
: 基于reactor-netty
项目中的ReactorNettyTcpClient
构建。
你可以提供任何其他 StompClientSupport
实现。请参阅这些类的 Javadoc。
StompClientSupport
类被设计为 工厂,用于为提供的 StompSessionHandler
生成 StompSession
,所有剩余的工作都通过对该 StompSessionHandler
和 StompSession
抽象的 回调 来完成。使用 Spring Integration 适配器 抽象,我们需要提供一些托管共享对象来将我们的应用程序表示为具有其唯一会话的 STOMP 客户端。为此,Spring Integration 提供了 StompSessionManager
抽象来管理任何提供的 StompSessionHandler
之间的 单个 StompSession
。这允许为特定的 STOMP Broker 使用 入站 或 出站 通道适配器(或两者)。有关更多信息,请参阅 StompSessionManager
(及其实现)的 JavaDocs。
STOMP 入站通道适配器
StompInboundChannelAdapter
是一个一站式的 MessageProducer
组件,它将你的 Spring Integration 应用程序订阅到提供的 STOMP 目标,并从它们接收消息(通过连接的 StompSession
上提供的 MessageConverter
从 STOMP 帧转换而来)。你可以通过在 StompInboundChannelAdapter
上使用适当的 @ManagedOperation
注解,在运行时更改目标(以及 STOMP 订阅)。
有关更多配置选项,请参阅 stomp-namespace 和 StompInboundChannelAdapter
的 Javadoc。
STOMP 出站通道适配器
StompMessageHandler
是 <int-stomp:outbound-channel-adapter>
的 MessageHandler
,用于通过 StompSession
(由共享的 StompSessionManager
提供)将传出的 Message<?>
实例发送到 STOMP destination
(预配置或在运行时通过 SpEL 表达式确定)。
有关更多配置选项,请参阅 stomp-namespace 和 StompMessageHandler
的 Javadoc。
STOMP 头部映射
STOMP 协议提供头部作为其帧的一部分。STOMP 帧的整个结构具有以下格式:
....
COMMAND
header1:value1
header2:value2
Body^@
....
Spring Framework 提供了 StompHeaders
来表示这些头部。有关更多详细信息,请参阅 Javadoc。STOMP 帧被转换为 Message<?>
实例,这些头部被映射到 MessageHeaders
实例。Spring Integration 为 STOMP 适配器提供了默认的 HeaderMapper
实现。该实现是 StompHeaderMapper
。它分别为入站和出站适配器提供了 fromHeaders()
和 toHeaders()
操作。
与许多其他 Spring Integration 模块一样,IntegrationStompHeaders
类已被引入,用于将标准 STOMP 头部映射到 MessageHeaders
,其中 stomp_
作为头部名称前缀。此外,所有带有该前缀的 MessageHeaders
实例在发送到目标时都会映射到 StompHeaders
。
有关更多信息,请参阅这些类的 Javadoc 和 stomp-namespace 中 mapped-headers
属性的描述。
STOMP 集成事件
许多 STOMP 操作都是异步的,包括错误处理。例如,STOMP 有一个 RECEIPT
服务器帧,当客户端帧通过添加 RECEIPT
头部请求时,它会返回该帧。为了提供对这些异步事件的访问,Spring Integration 发出 StompIntegrationEvent
实例,你可以通过实现 ApplicationListener
或使用 <int-event:inbound-channel-adapter>
来获取它们(请参阅 接收 Spring 应用程序事件)。
具体来说,当 stompSessionListenableFuture
因连接 STOMP Broker 失败而收到 onFailure()
时,AbstractStompSessionManager
会发出 StompExceptionEvent
。另一个例子是 StompMessageHandler
。它处理 ERROR
STOMP 帧,这是服务器对 StompMessageHandler
发送的不正确(不被接受)消息的响应。
StompMessageHandler
作为 StompSession.Receiptable
回调的一部分,在发送到 StompSession
的消息的异步响应中发出 StompReceiptEvent
。StompReceiptEvent
可以是正面的或负面的,这取决于在 receiptTimeLimit
期限内是否从服务器收到了 RECEIPT
帧,你可以在 StompClientSupport
实例上配置该期限。它默认为 15 * 1000
(以毫秒为单位,即 15 秒)。
|
有关如何配置 Spring Integration 以接受这些 ApplicationEvent
实例的更多信息,请参阅 stomp-java-config。
STOMP 适配器 Java 配置
以下示例显示了 STOMP 适配器的全面 Java 配置:
@Configuration
@EnableIntegration
public class StompConfiguration {
@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}
@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
return handler;
}
@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}
@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
return producer;
}
}
STOMP 命名空间支持
Spring Integration STOMP 命名空间实现了入站和出站通道适配器组件。要将其包含在你的配置中,请在你的应用程序上下文配置文件中提供以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
...
</beans>
理解 <int-stomp:outbound-channel-adapter>
元素
以下列表显示了 STOMP 出站通道适配器的可用属性:
<int-stomp:outbound-channel-adapter
id="" [id="CO1-1"]1
channel="" [id="CO1-2"]2
stomp-session-manager="" [id="CO1-3"]3
header-mapper="" [id="CO1-4"]4
mapped-headers="" [id="CO1-5"]5
destination="" [id="CO1-6"]6
destination-expression="" [id="CO1-7"]7
auto-startup="" [id="CO1-8"]8
phase=""/> [id="CO1-9"]9
1 | 组件 bean 名称。MessageHandler 以 id 加 .handler 的 bean 别名注册。如果你不设置 channel 属性,则会创建一个 DirectChannel ,并以该 id 属性的值作为 bean 名称注册到应用程序上下文中。在这种情况下,端点以 id 加 .adapter 的 bean 名称注册。 |
2 | 如果存在 id ,则标识附加到此适配器的通道。请参阅 id 。可选。 |
3 | 对 StompSessionManager bean 的引用,该 bean 封装了底层连接和 StompSession 处理操作。必需。 |
4 | 对实现 HeaderMapper<StompHeaders> 的 bean 的引用,该 bean 将 Spring Integration MessageHeaders 映射到 STOMP 帧头部。它与 mapped-headers 互斥。它默认为 StompHeaderMapper 。 |
5 | 逗号分隔的 STOMP 头部名称列表,这些头部将映射到 STOMP 帧头部。仅当未设置 header-mapper 引用时才能提供。此列表中的值也可以是与头部名称匹配的简单模式(例如 myheader* 或 *myheader )。一个特殊令牌 (STOMP_OUTBOUND_HEADERS ) 表示所有标准 STOMP 头部(内容长度、回执、心跳等)。它们默认包含在内。如果你想添加自己的头部并希望标准头部也进行映射,则必须包含此令牌或使用 header-mapper 提供你自己的 HeaderMapper 实现。 |
6 | STOMP 消息发送到的目标名称。它与 destination-expression 互斥。 |
7 | 一个 SpEL 表达式,在运行时针对每个 Spring Integration Message 作为根对象进行评估。它与 destination 互斥。 |
8 | 布尔值,指示此端点是否应自动启动。它默认为 true 。 |
9 | 此端点应启动和停止的生命周期阶段。值越低,此端点启动越早,停止越晚。默认值为 Integer.MIN_VALUE 。值可以是负数。请参阅 SmartLifeCycle 。 |
理解 <int-stomp:inbound-channel-adapter>
元素
以下列表显示了 STOMP 入站通道适配器的可用属性:
<int-stomp:inbound-channel-adapter
id="" [id="CO2-1"]1
channel="" [id="CO2-2"]2
error-channel="" [id="CO2-3"]3
stomp-session-manager="" [id="CO2-4"]4
header-mapper="" [id="CO2-5"]5
mapped-headers="" [id="CO2-6"]6
destinations="" [id="CO2-7"]7
send-timeout="" [id="CO2-8"]8
payload-type="" [id="CO2-9"]9
auto-startup="" [id="CO2-10"]10
phase=""/> [id="CO2-11"]11
1 | 组件 bean 名称。如果你不设置 channel 属性,则会创建一个 DirectChannel ,并以该 id 属性的值作为 bean 名称注册到应用程序上下文中。在这种情况下,端点以 id 加 .adapter 的 bean 名称注册。 |
2 | 标识附加到此适配器的通道。 |
3 | MessageChannel bean 引用,ErrorMessage 实例应发送到该引用。 |
4 | 请参阅 <<`<int-stomp:outbound-channel-adapter>`,stomp-outbound-channel-adapter>> 上的相同选项。 |
5 | 逗号分隔的 STOMP 头部名称列表,这些头部将从 STOMP 帧头部映射。仅当未设置 header-mapper 引用时才能提供。此列表中的值也可以是与头部名称匹配的简单模式(例如 myheader* 或 *myheader )。一个特殊令牌 (STOMP_INBOUND_HEADERS ) 表示所有标准 STOMP 头部(内容长度、回执、心跳等)。它们默认包含在内。如果你想添加自己的头部并希望标准头部也进行映射,则必须包含此令牌或使用 header-mapper 提供你自己的 HeaderMapper 实现。 |
6 | 请参阅 <<`<int-stomp:outbound-channel-adapter>`,stomp-outbound-channel-adapter>> 上的相同选项。 |
7 | 逗号分隔的 STOMP 目标名称列表,用于订阅。目标列表(以及订阅)可以通过 addDestination() 和 removeDestination() @ManagedOperation 注解在运行时修改。 |
8 | 如果通道可以阻塞,则在向通道发送消息时等待的最大时间量(以毫秒为单位)。例如,如果 QueueChannel 已达到其最大容量,则它可能会阻塞直到有可用空间。 |
9 | 目标 payload 的 Java 类型的完全限定名称,用于从传入的 STOMP 帧进行转换。它默认为 String.class 。 |
10 | 请参阅 <<`<int-stomp:outbound-channel-adapter>`,stomp-outbound-channel-adapter>> 上的相同选项。 |
11 | 请参阅 <<`<int-stomp:outbound-channel-adapter>`,stomp-outbound-channel-adapter>> 上的相同选项。 |