TCP 消息关联

IP 端点的一个目标是与 Spring Integration 应用程序以外的系统进行通信。 因此,默认情况下只发送和接收消息负载。 自 3.0 版本以来,您可以使用 JSON、Java 序列化或自定义序列化器和反序列化器来传输消息头。 有关更多信息,请参阅 传输消息头。 框架不提供任何消息关联(网关除外),服务器端的协作通道适配器也不提供。 本文档后面,我们将讨论应用程序可用的各种关联技术。 在大多数情况下,这需要消息的特定应用程序级关联,即使消息负载包含一些自然的关联数据(如订单号)。

网关

网关自动关联消息。 但是,对于相对低流量的应用程序,您应该使用出站网关。 当您将连接工厂配置为对所有消息对使用单个共享连接时('single-use="false"'),一次只能处理一条消息。 新消息必须等待直到收到对前一条消息的回复。 当连接工厂配置为每个新消息使用新连接时('single-use="true"'),此限制不适用。 虽然此设置可以比共享连接环境提供更高的吞吐量,但它伴随着为每个消息对打开和关闭新连接的开销。

因此,对于高流量消息,请考虑使用一对协作通道适配器。 但是,要这样做,您需要提供协作逻辑。

Spring Integration 2.2 中引入的另一个解决方案是使用 CachingClientConnectionFactory,它允许使用共享连接池。

协作的出站和入站通道适配器

为了实现高吞吐量(避免使用网关的陷阱,如 前面提到),您可以配置一对协作的出站和入站通道适配器。 您还可以使用协作适配器(服务器端或客户端)进行完全异步通信(而不是请求-回复语义)。 在服务器端,消息关联由适配器自动处理,因为入站适配器添加了一个消息头,允许出站适配器在发送回复消息时确定使用哪个连接。

在服务器端,您必须填充 ip_connectionId 消息头,因为它用于将消息与连接关联起来。 源自入站适配器的消息会自动设置该消息头。 如果您希望构建要发送的其他消息,则需要设置该消息头。 您可以从传入消息中获取消息头值。

在客户端,应用程序必须在需要时提供自己的关联逻辑。 您可以通过多种方式实现这一点。

如果消息负载具有某些自然的关联数据(例如事务 ID 或订单号),并且您不需要保留原始出站消息中的任何信息(例如回复通道消息头),则关联很简单,并且无论如何都会在应用程序级别完成。

如果消息负载具有某些自然的关联数据(例如事务 ID 或订单号),但您需要保留原始出站消息中的某些信息(例如回复通道消息头),您可以保留原始出站消息的副本(可能通过使用发布-订阅通道),并使用聚合器重新组合所需的数据。

对于前两种情况中的任何一种,如果负载没有自然的关联数据,您可以提供一个位于出站通道适配器上游的转换器,以使用此类数据增强负载。 此类转换器可以将原始负载转换为包含原始负载和消息头子集的新对象。 当然,消息头中的活动对象(例如回复通道)不能包含在转换后的负载中。

如果您选择这样的策略,您需要确保连接工厂具有适当的序列化器-反序列化器对来处理此类负载(例如使用 Java 序列化的 DefaultSerializerDefaultDeserializer,或自定义序列化器和反序列化器)。 TCP 连接工厂中提到的 ByteArray*Serializer 选项,包括默认的 ByteArrayCrLfSerializer,不支持此类负载,除非转换后的负载是 Stringbyte[]

在 2.2 版本之前,当协作通道适配器使用客户端连接工厂时,so-timeout 属性默认为默认回复超时(10 秒)。 这意味着,如果入站适配器在此期间没有收到任何数据,则套接字将被关闭。 这种默认行为在真正的异步环境中不适用,因此它现在默认为无限超时。 您可以通过将客户端连接工厂上的 so-timeout 属性设置为 10000 毫秒来恢复以前的默认行为。

从 5.4 版本开始,多个出站通道适配器和一个 TcpInboundChannelAdapter 可以共享同一个连接工厂。 这允许应用程序同时支持请求/回复和任意服务器 → 客户端消息传递。 有关更多信息,请参阅 TCP 网关

传输消息头

TCP 是一种流协议。 SerializersDeserializers 在流中划分消息。 在 3.0 之前,只能通过 TCP 传输消息负载(Stringbyte[])。 从 3.0 开始,您可以传输选定的消息头以及负载。 但是,“live” 对象,例如 replyChannel 消息头,无法序列化。

通过 TCP 发送消息头信息需要一些额外的配置。

第一步是为 ConnectionFactory 提供一个 MessageConvertingTcpMessageMapper,它使用 mapper 属性。 此映射器委托给任何 MessageConverter 实现,将消息转换为可由配置的 serializerdeserializer 序列化和反序列化的对象。

Spring Integration 提供了一个 MapMessageConverter,它允许指定一个消息头列表,这些消息头连同负载一起添加到 Map 对象中。 生成的 Map 有两个条目:payloadheadersheaders 条目本身是一个 Map,并包含选定的消息头。

第二步是提供一个序列化器和反序列化器,它们可以在 Map 和某些线路格式之间进行转换。 这可以是一个自定义的 SerializerDeserializer,如果您需要与非 Spring Integration 应用程序进行通信,通常需要这样做。

Spring Integration 提供了一个 MapJsonSerializer,用于将 Map 转换为 JSON。 它使用 Spring Integration JsonObjectMapper。 如果需要,您可以提供自定义的 JsonObjectMapper。 默认情况下,序列化器在对象之间插入一个换行符(0x0a)。 有关更多信息,请参阅 Javadoc

JsonObjectMapper 使用类路径上的任何 Jackson 版本。

您还可以使用 DefaultSerializerDefaultDeserializerMap 进行标准 Java 序列化。

以下示例显示了使用 JSON 传输 correlationIdsequenceNumbersequenceSize 消息头的连接工厂配置:

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="12345"
    mapper="mapper"
    serializer="jsonSerializer"
    deserializer="jsonSerializer"/>

<bean id="mapper"
      class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
    <constructor-arg name="messageConverter">
        <bean class="o.sf.integration.support.converter.MapMessageConverter">
            <property name="headerNames">
                <list>
                    <value>correlationId</value>
                    <value>sequenceNumber</value>
                    <value>sequenceSize</value>
                </list>
            </property>
        </bean>
    </constructor-arg>
</bean>

<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />

使用上述配置发送的消息,其负载为 'something',将在线路上显示如下:

{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}