STOMP 客户端

Spring 提供了基于 WebSocket 的 STOMP 客户端和基于 TCP 的 STOMP 客户端。

首先,您可以创建并配置 WebSocketStompClient,如以下示例所示:

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

在前面的示例中,您可以将 StandardWebSocketClient 替换为 SockJsClient, 因为 SockJsClient 也是 WebSocketClient 的一个实现。SockJsClient 可以 使用 WebSocket 或基于 HTTP 的传输作为回退。有关更多详细信息,请参阅 SockJsClient

接下来,您可以建立连接并为 STOMP 会话提供一个处理器,如以下示例所示:

String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

当会话准备就绪时,处理器会收到通知,如以下示例所示:

public class MyStompSessionHandler extends StompSessionHandlerAdapter {

	@Override
	public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
		// ...
	}
}

一旦会话建立,任何有效载荷都可以发送,并使用配置的 MessageConverter 进行 序列化,如以下示例所示:

session.send("/topic/something", "payload");

您也可以订阅目的地。subscribe 方法需要一个用于订阅消息的处理器,并返回一个 Subscription 句柄,您可以使用它来取消订阅。对于每条收到的消息,处理器可以指定 有效载荷应反序列化到的目标 Object 类型,如以下示例所示:

session.subscribe("/topic/something", new StompFrameHandler() {

	@Override
	public Type getPayloadType(StompHeaders headers) {
		return String.class;
	}

	@Override
	public void handleFrame(StompHeaders headers, Object payload) {
		// ...
	}

});

要启用 STOMP 心跳,您可以为 WebSocketStompClient 配置一个 TaskScheduler, 并可选地自定义心跳间隔(写入不活动时为 10 秒,这将导致发送心跳;读取不活动时为 10 秒,这将关闭连接)。

WebSocketStompClient 仅在不活动时发送心跳,即当没有其他消息发送时。 当使用外部代理时,这可能会带来挑战,因为具有非代理目的地的消息表示活动,但 实际上并未转发到代理。在这种情况下,您可以在初始化 外部代理 时配置一个 TaskScheduler, 这确保了即使只发送具有非代理目的地的消息,心跳也会转发到代理。

当您使用 WebSocketStompClient 进行性能测试以模拟同一台机器上的数千个客户端时, 请考虑关闭心跳,因为每个连接都会调度自己的心跳任务,这对于在同一台机器上运行 大量客户端并未进行优化。

STOMP 协议还支持回执,客户端必须添加一个 receipt 头部,服务器在处理完发送或 订阅后会用一个 RECEIPT 帧进行响应。为了支持这一点,StompSession 提供了 setAutoReceipt(boolean),它会导致在每个后续的发送或订阅事件中添加一个 receipt 头部。 或者,您也可以手动将回执头部添加到 StompHeaders。 发送和订阅都返回一个 Receiptable 实例,您可以使用它来注册回执成功和失败回调。 对于此功能,您必须为客户端配置一个 TaskScheduler 以及回执过期的时间量 (默认为 15 秒)。

请注意,StompSessionHandler 本身是一个 StompFrameHandler,除了用于处理消息时 出现的异常的 handleException 回调和用于处理传输级别错误(包括 ConnectionLostException)的 handleTransportError 之外,它还可以处理 ERROR 帧。

您可以使用 WebSocketStompClientinboundMessageSizeLimitoutboundMessageSizeLimit 属性来限制入站和出站 WebSocket 消息的最大大小。 当出站 STOMP 消息超过限制时,它会被拆分成部分帧,接收方必须重新组装。 默认情况下,出站消息没有大小限制。当入站 STOMP 消息大小超过配置的限制时,会抛出 StompConversionException。入站消息的默认大小限制为 64KB

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setInboundMessageSizeLimit(64 * 1024); // 64KB
stompClient.setOutboundMessageSizeLimit(64 * 1024); // 64KB