TCP 消息关联
网关
网关自动关联消息。 但是,对于相对低流量的应用程序,您应该使用出站网关。 当您将连接工厂配置为对所有消息对使用单个共享连接时('single-use="false"'),一次只能处理一条消息。 新消息必须等待直到收到对前一条消息的回复。 当连接工厂配置为每个新消息使用新连接时('single-use="true"'),此限制不适用。 虽然此设置可以比共享连接环境提供更高的吞吐量,但它伴随着为每个消息对打开和关闭新连接的开销。
因此,对于高流量消息,请考虑使用一对协作通道适配器。 但是,要这样做,您需要提供协作逻辑。
Spring Integration 2.2 中引入的另一个解决方案是使用 CachingClientConnectionFactory
,它允许使用共享连接池。
协作的出站和入站通道适配器
为了实现高吞吐量(避免使用网关的陷阱,如 前面提到),您可以配置一对协作的出站和入站通道适配器。 您还可以使用协作适配器(服务器端或客户端)进行完全异步通信(而不是请求-回复语义)。 在服务器端,消息关联由适配器自动处理,因为入站适配器添加了一个消息头,允许出站适配器在发送回复消息时确定使用哪个连接。
在服务器端,您必须填充 |
在客户端,应用程序必须在需要时提供自己的关联逻辑。 您可以通过多种方式实现这一点。
如果消息负载具有某些自然的关联数据(例如事务 ID 或订单号),并且您不需要保留原始出站消息中的任何信息(例如回复通道消息头),则关联很简单,并且无论如何都会在应用程序级别完成。
如果消息负载具有某些自然的关联数据(例如事务 ID 或订单号),但您需要保留原始出站消息中的某些信息(例如回复通道消息头),您可以保留原始出站消息的副本(可能通过使用发布-订阅通道),并使用聚合器重新组合所需的数据。
对于前两种情况中的任何一种,如果负载没有自然的关联数据,您可以提供一个位于出站通道适配器上游的转换器,以使用此类数据增强负载。 此类转换器可以将原始负载转换为包含原始负载和消息头子集的新对象。 当然,消息头中的活动对象(例如回复通道)不能包含在转换后的负载中。
如果您选择这样的策略,您需要确保连接工厂具有适当的序列化器-反序列化器对来处理此类负载(例如使用 Java 序列化的 DefaultSerializer
和 DefaultDeserializer
,或自定义序列化器和反序列化器)。
TCP 连接工厂中提到的 ByteArray*Serializer
选项,包括默认的 ByteArrayCrLfSerializer
,不支持此类负载,除非转换后的负载是 String
或 byte[]
。
在 2.2 版本之前,当协作通道适配器使用客户端连接工厂时, |
从 5.4 版本开始,多个出站通道适配器和一个 TcpInboundChannelAdapter
可以共享同一个连接工厂。
这允许应用程序同时支持请求/回复和任意服务器 → 客户端消息传递。
有关更多信息,请参阅 TCP 网关。
传输消息头
TCP 是一种流协议。
Serializers
和 Deserializers
在流中划分消息。
在 3.0 之前,只能通过 TCP 传输消息负载(String
或 byte[]
)。
从 3.0 开始,您可以传输选定的消息头以及负载。
但是,“live” 对象,例如 replyChannel
消息头,无法序列化。
通过 TCP 发送消息头信息需要一些额外的配置。
第一步是为 ConnectionFactory
提供一个 MessageConvertingTcpMessageMapper
,它使用 mapper
属性。
此映射器委托给任何 MessageConverter
实现,将消息转换为可由配置的 serializer
和 deserializer
序列化和反序列化的对象。
Spring Integration 提供了一个 MapMessageConverter
,它允许指定一个消息头列表,这些消息头连同负载一起添加到 Map
对象中。
生成的 Map 有两个条目:payload
和 headers
。
headers
条目本身是一个 Map
,并包含选定的消息头。
第二步是提供一个序列化器和反序列化器,它们可以在 Map
和某些线路格式之间进行转换。
这可以是一个自定义的 Serializer
或 Deserializer
,如果您需要与非 Spring Integration 应用程序进行通信,通常需要这样做。
Spring Integration 提供了一个 MapJsonSerializer
,用于将 Map
转换为 JSON。
它使用 Spring Integration JsonObjectMapper
。
如果需要,您可以提供自定义的 JsonObjectMapper
。
默认情况下,序列化器在对象之间插入一个换行符(0x0a
)。
有关更多信息,请参阅 Javadoc。
|
您还可以使用 DefaultSerializer
和 DefaultDeserializer
对 Map
进行标准 Java 序列化。
以下示例显示了使用 JSON 传输 correlationId
、sequenceNumber
和 sequenceSize
消息头的连接工厂配置:
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
mapper="mapper"
serializer="jsonSerializer"
deserializer="jsonSerializer"/>
<bean id="mapper"
class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<constructor-arg name="messageConverter">
<bean class="o.sf.integration.support.converter.MapMessageConverter">
<property name="headerNames">
<list>
<value>correlationId</value>
<value>sequenceNumber</value>
<value>sequenceSize</value>
</list>
</property>
</bean>
</constructor-arg>
</bean>
<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
使用上述配置发送的消息,其负载为 'something',将在线路上显示如下:
{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}