TCP 连接工厂

概述

对于 TCP,底层连接的配置通过使用连接工厂来提供。提供了两种类型的连接工厂:客户端连接工厂和服务器连接工厂。客户端连接工厂建立出站连接。服务器连接工厂监听入站连接。

出站通道适配器使用客户端连接工厂,但您也可以向入站通道适配器提供客户端连接工厂的引用。该适配器接收由出站适配器创建的连接上收到的任何入站消息。

入站通道适配器或网关使用服务器连接工厂。(实际上,连接工厂没有它就无法工作)。您还可以向出站适配器提供服务器连接工厂的引用。然后,您可以使用该适配器通过同一连接向入站消息发送回复。

只有当回复包含由连接工厂插入到原始消息中的 ip_connectionId 消息头时,回复消息才会被路由到连接。

这是在入站和出站适配器之间共享连接工厂时执行的消息关联的程度。这种共享允许通过 TCP 进行异步双向通信。默认情况下,只有负载信息通过 TCP 传输。因此,任何消息关联都必须由下游组件(例如聚合器或其他端点)执行。对传输选定消息头的支持是在 3.0 版本中引入的。有关更多信息,请参阅 TCP 消息关联

您可以向每种类型最多一个适配器提供连接工厂的引用。

Spring Integration 提供了使用 java.net.Socketjava.nio.channel.SocketChannel 的连接工厂。

以下示例展示了一个使用 java.net.Socket 连接的简单服务器连接工厂:

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"/>

以下示例展示了一个使用 java.nio.channel.SocketChannel 连接的简单服务器连接工厂:

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    using-nio="true"/>

从 Spring Integration 4.2 版本开始,如果服务器配置为监听随机端口(通过将端口设置为 0),您可以使用 getPort() 获取操作系统选择的实际端口。此外,getServerSocketAddress() 允许您获取完整的 SocketAddress。有关更多信息,请参阅 TcpServerConnectionFactory 接口的 Javadoc

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"/>

以下示例展示了一个使用 java.net.Socket 连接并为每条消息创建新连接的客户端连接工厂:

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"
    using-nio=true/>

从 5.2 版本开始,客户端连接工厂支持 connectTimeout 属性,以秒为单位指定,默认为 60。

消息边界划分(序列化器和反序列化器)

TCP 是一种流协议。这意味着必须为通过 TCP 传输的数据提供一些结构,以便接收方可以将数据划分为离散的消息。连接工厂配置为使用序列化器和反序列化器,分别用于在消息负载和通过 TCP 发送的比特之间进行转换。这是通过为入站和出站消息分别提供反序列化器和序列化器来实现的。Spring Integration 提供了许多标准序列化器和反序列化器。

ByteArrayCrlfSerializer* 将字节数组转换为字节流,后跟回车和换行符 (\r\n)。这是默认的序列化器(和反序列化器),可以(例如)与 telnet 作为客户端一起使用。

ByteArraySingleTerminatorSerializer* 将字节数组转换为字节流,后跟一个终止字符(默认为 0x00)。

ByteArrayLfSerializer* 将字节数组转换为字节流,后跟一个换行符 (0x0a)。

ByteArrayStxEtxSerializer* 将字节数组转换为字节流,前缀为 STX (0x02),后缀为 ETX (0x03)。

ByteArrayLengthHeaderSerializer 将字节数组转换为字节流,前缀为网络字节序(大端)的二进制长度。这是一种高效的反序列化器,因为它不必解析每个字节来查找终止字符序列。它还可以用于包含二进制数据的负载。长度头的默认大小是四个字节(一个整数),允许消息最大为 (2^31 - 1) 字节。但是,length 头可以是一个字节(无符号),用于最大 255 字节的消息,或者一个无符号短整型(2 字节),用于最大 (2^16 - 1) 字节的消息。如果您需要任何其他格式的头,您可以子类化 ByteArrayLengthHeaderSerializer 并提供 readHeaderwriteHeader 方法的实现。绝对最大数据大小是 (2^31 - 1) 字节。从 5.2 版本开始,头值可以包含头的长度以及负载。设置 inclusive 属性以启用该机制(它必须在生产者和消费者之间设置为相同)。

ByteArrayRawSerializer* 将字节数组转换为字节流,不添加任何额外的消息边界数据。使用此序列化器(和反序列化器),消息的结束由客户端有序地关闭套接字来指示。当使用此序列化器时,消息接收会一直挂起,直到客户端关闭套接字或发生超时。超时不会导致消息。当使用此序列化器且客户端是 Spring Integration 应用程序时,客户端必须使用配置了 single-use="true" 的连接工厂。这样做会导致适配器在发送消息后关闭套接字。序列化器本身不会关闭连接。您应该只将此序列化器与通道适配器(而非网关)使用的连接工厂一起使用,并且连接工厂应由入站或出站适配器之一使用,但不能同时使用。另请参阅本节后面的 ByteArrayElasticRawDeserializer。但是,从 5.2 版本开始,出站网关有一个新的属性 closeStreamAfterSend;这允许使用原始序列化器/反序列化器,因为 EOF 会发送给服务器,同时保持连接打开以接收回复。

在 4.2.2 版本之前,当使用非阻塞 I/O (NIO) 时,此序列化器将超时(在读取期间)视为文件结束,并且到目前为止读取的数据作为消息发出。这是不可靠的,不应该用于分隔消息。它现在将此类情况视为异常。在极少数情况下,如果您以这种方式使用它,您可以通过将 treatTimeoutAsEndOfMessage 构造函数参数设置为 true 来恢复以前的行为。

它们都是 AbstractByteArraySerializer 的子类,它实现了 org.springframework.core.serializer.Serializerorg.springframework.core.serializer.Deserializer 接口。为了向后兼容,使用 AbstractByteArraySerializer 的任何子类进行序列化的连接也接受首先转换为字节数组的 String。这些序列化器和反序列化器都将包含相应格式的输入流转换为字节数组负载。

为了避免由于行为不当的客户端(不遵守配置的序列化器协议的客户端)导致内存耗尽,这些序列化器施加了最大消息大小。如果传入消息超过此大小,则会抛出异常。默认最大消息大小为 2048 字节。您可以通过设置 maxMessageSize 属性来增加它。如果您使用默认序列化器或反序列化器并希望增加最大消息大小,则必须将最大消息大小声明为一个显式 bean,并设置 maxMessageSize 属性,然后将连接工厂配置为使用该 bean。

本节前面标有 * 的类使用一个中间缓冲区并将解码后的数据复制到正确大小的最终缓冲区。从 4.3 版本开始,您可以通过设置 poolSize 属性来配置这些缓冲区,以使这些原始缓冲区可以重用,而不是为每条消息分配和丢弃,这是默认行为。将属性设置为负值会创建一个无界的池。如果池是有界的,您还可以设置 poolWaitTimeout 属性(以毫秒为单位),在此之后,如果没有可用的缓冲区,则会抛出异常。它默认为无限大。此类异常会导致套接字关闭。

如果您希望在自定义反序列化器中使用相同的机制,您可以扩展 AbstractPooledBufferByteArraySerializer(而不是其超类 AbstractByteArraySerializer)并实现 doDeserialize() 而不是 deserialize()。缓冲区会自动返回到池中。AbstractPooledBufferByteArraySerializer 还提供了一个方便的实用方法:copyToSizedArray()

5.0 版本添加了 ByteArrayElasticRawDeserializer。这与上面 ByteArrayRawSerializer 的反序列化器侧相似,不同之处在于不需要设置 maxMessageSize。在内部,它使用一个 ByteArrayOutputStream,允许缓冲区根据需要增长。客户端必须有序地关闭套接字以表示消息结束。

此反序列化器只能在信任对等方时使用;它容易受到由于内存不足条件导致的 DoS 攻击。

MapJsonSerializer 使用 Jackson ObjectMapperMap 和 JSON 之间进行转换。您可以将此序列化器与 MessageConvertingTcpMessageMapperMapMessageConverter 结合使用,以 JSON 格式传输选定的消息头和负载。

Jackson ObjectMapper 无法在流中划分消息。因此,MapJsonSerializer 需要委托给另一个序列化器或反序列化器来处理消息划分。默认情况下,使用 ByteArrayLfSerializer,导致在线路上消息格式为 <json><LF>,但您可以将其配置为使用其他格式。(下一个示例展示了如何实现)。

最后一个标准序列化器是 org.springframework.core.serializer.DefaultSerializer,您可以使用它通过 Java 序列化转换可序列化对象。org.springframework.core.serializer.DefaultDeserializer 用于包含可序列化对象的流的入站反序列化。

如果您不想使用默认的序列化器和反序列化器 (ByteArrayCrLfSerializer),您必须在连接工厂上设置 serializerdeserializer 属性。以下示例展示了如何实现:

<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer" />

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

一个使用 java.net.Socket 连接并在线路上使用 Java 序列化的服务器连接工厂。

有关连接工厂可用属性的完整详细信息,请参阅本节末尾的 参考

默认情况下,不对入站数据包执行反向 DNS 查找:在未配置 DNS 的环境中(例如 Docker 容器),这可能导致连接延迟。要将 IP 地址转换为用于消息头的 Host 名称,可以通过将 lookup-host 属性设置为 true 来覆盖默认行为。

您还可以修改套接字和套接字工厂的属性。有关更多信息,请参阅 SSL/TLS 支持。如其中所述,无论是否使用 SSL,此类修改都是可能的。

主机验证

从 5.1.0 版本开始,主机验证默认启用以增强安全性。此功能确保在 TCP 连接期间验证服务器的身份。

如果您遇到需要禁用主机验证的场景(不推荐),您可以在 tcp-connection-factory 中配置 socket-support 属性。

<int-ip:tcp-connection-factory id="client"
                                type="client"
                                host="localhost"
                                port="0"
                                socket-support="customSocketSupport"
                                single-use="true"
                                so-timeout="10000"/>

<bean id="customSocketSupport" class="org.springframework.integration.ip.tcp.connection.DefaultTcpSocketSupport">
	<constructor-arg value="false" />
</bean>

自定义序列化器和反序列化器

如果您的数据格式不受任何标准反序列化器的支持,您可以实现自己的反序列化器;您还可以实现自定义序列化器。

要实现自定义序列化器和反序列化器对,请实现 org.springframework.core.serializer.Deserializerorg.springframework.core.serializer.Serializer 接口。

当反序列化器检测到消息之间输入流已关闭时,它必须抛出 SoftEndOfStreamException;这是向框架发出的信号,表示关闭是“正常”的。如果在解码消息时流关闭,则应抛出其他异常。

从 5.2 版本开始,SoftEndOfStreamException 现在是 RuntimeException 而不是扩展 IOException

TCP 缓存客户端连接工厂

前面所述,TCP 套接字可以是“一次性使用”(一个请求或响应)或共享的。共享套接字在高并发环境下与出站网关的性能不佳,因为套接字一次只能处理一个请求或响应。

为了提高性能,您可以使用协作通道适配器而不是网关,但这需要应用程序级别的消息关联。有关更多信息,请参阅 TCP 消息关联

Spring Integration 2.2 引入了缓存客户端连接工厂,它使用共享套接字池,允许网关使用共享连接池处理多个并发请求。

TCP 故障转移客户端连接工厂

您可以配置支持故障转移到一个或多个其他服务器的 TCP 连接工厂。发送消息时,工厂会遍历所有配置的工厂,直到消息可以发送或找不到连接为止。最初,使用配置列表中的第一个工厂。如果连接随后失败,则下一个工厂将成为当前工厂。以下示例展示了如何配置故障转移客户端连接工厂:

<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
    <constructor-arg>
        <list>
            <ref bean="clientFactory1"/>
            <ref bean="clientFactory2"/>
        </list>
    </constructor-arg>
</bean>

使用故障转移连接工厂时,singleUse 属性必须在工厂本身及其配置使用的工厂列表之间保持一致。

连接工厂有两个与故障恢复相关的属性,当与共享连接一起使用时 (singleUse=false):

  • refreshSharedInterval

  • closeOnRefresh

考虑基于上述配置的以下场景: 假设 clientFactory1 无法建立连接,但 clientFactory2 可以。当 refreshSharedInterval 过去后调用 failCFgetConnection() 方法时,我们将再次尝试使用 clientFactory1 连接;如果成功,与 clientFactory2 的连接将关闭。如果 closeOnRefreshfalse,则“旧”连接将保持打开状态,并且如果第一个工厂再次失败,将来可能会被重用。

refreshSharedInterval 设置为仅在时间过期后尝试重新连接第一个工厂;如果只想在当前连接失败时才故障恢复到第一个工厂,则将其设置为 Long.MAX_VALUE(默认值)。

closeOnRefresh 设置为在刷新实际创建新连接后关闭“旧”连接。

如果任何委托工厂是 CachingClientConnectionFactory,则这些属性不适用,因为连接缓存是在那里处理的;在这种情况下,将始终查阅连接工厂列表以获取连接。

从 5.3 版本开始,这些属性默认为 Long.MAX_VALUEtrue,因此工厂仅在当前连接失败时才尝试故障恢复。要恢复到以前版本的默认行为,请将它们设置为 0false

另请参阅 测试连接

TCP 线程亲和性连接工厂

Spring Integration 5.0 版本引入了此连接工厂。它将连接绑定到调用线程,并且每次该线程发送消息时都会重用相同的连接。这会一直持续到连接关闭(由服务器或网络)或线程调用 releaseConnection() 方法。连接本身由另一个客户端工厂实现提供,该工厂必须配置为提供非共享(一次性使用)连接,以便每个线程都获得一个连接。

以下示例展示了如何配置 TCP 线程亲和性连接工厂:

@Bean
public TcpNetClientConnectionFactory cf() {
    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
            Integer.parseInt(System.getProperty(PORT)));
    cf.setSingleUse(true);
    return cf;
}

@Bean
public ThreadAffinityClientConnectionFactory tacf() {
    return new ThreadAffinityClientConnectionFactory(cf());
}

@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
    TcpOutboundGateway outGate = new TcpOutboundGateway();
    outGate.setConnectionFactory(tacf());
    outGate.setReplyChannelName("toString");
    return outGate;
}