消息存储
企业集成模式 (EIP) 一书识别了多种能够缓冲消息的模式。例如,聚合器会缓冲消息直到它们可以被释放,而 QueueChannel
会缓冲消息直到消费者明确地从该通道接收这些消息。由于消息流中任何点都可能发生故障,因此缓冲消息的 EIP 组件也引入了消息可能丢失的风险点。为了降低消息丢失的风险,EIP 定义了 消息存储 模式,该模式允许 EIP 组件存储消息,通常是在某种类型的持久化存储(例如 RDBMS)中。Spring Integration 通过以下方式支持消息存储模式:
-
定义一个
org.springframework.integration.store.MessageStore
策略接口 -
提供该接口的多种实现
-
在所有具有缓冲消息能力的组件上公开一个
message-store
属性,以便您可以注入任何实现MessageStore
接口的实例。
关于如何配置特定消息存储实现以及如何将 MessageStore
实现注入特定缓冲组件的详细信息在整个手册中都有描述(请参阅特定组件,例如 QueueChannel、Aggregator、Delayer 等)。以下两对示例展示了如何为 QueueChannel
和聚合器添加消息存储的引用:
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator message-store="refToMessageStore"/>
默认情况下,消息使用 o.s.i.store.SimpleMessageStore
(MessageStore
的一个实现)存储在内存中。这对于开发或简单的低流量环境可能没问题,在这些环境中,非持久化消息的潜在丢失不是问题。然而,典型的生产应用程序需要更健壮的选项,不仅要降低消息丢失的风险,还要避免潜在的内存不足错误。因此,我们还为各种数据存储提供了 MessageStore
实现。以下是所有支持的实现的完整列表:
-
Hazelcast 消息存储:使用 Hazelcast 分布式缓存存储消息
-
JDBC 消息存储:使用 RDBMS 存储消息
-
Redis 消息存储:使用 Redis 键/值数据存储存储消息
-
MongoDB 消息存储:使用 MongoDB 文档存储存储消息
然而,在使用 MessageStore
的持久化实现时,请注意一些限制。消息数据(payload 和 headers)通过不同的序列化策略进行序列化和反序列化,具体取决于 MessageStore
的实现。例如,当使用 JdbcMessageStore
时,默认情况下只持久化 Serializable
数据。在这种情况下,非 Serializable
的头部在序列化之前会被移除。此外,请注意由传输适配器(例如 FTP、HTTP、JMS 等)注入的特定于协议的头部。例如,<http:inbound-channel-adapter/>
将 HTTP 头部映射到消息头部,其中一个是非 Serializable
org.springframework.http.MediaType
实例的 ArrayList
。但是,您可以将自己的 Serializer
和 Deserializer
策略接口实现注入到某些 MessageStore
实现(例如 JdbcMessageStore
)中,以更改序列化和反序列化的行为。请特别注意表示某些类型数据的头部。例如,如果其中一个头部包含某个 Spring bean 的实例,则在反序列化时,您可能会得到该 bean 的不同实例,这会直接影响框架创建的一些隐式头部(例如 REPLY_CHANNEL
或 ERROR_CHANNEL
)。目前,它们是不可序列化的,但即使它们是可序列化的,反序列化的通道也不会表示预期的实例。从 Spring Integration 3.0 版本开始,您可以通过配置一个头部增强器来解决此问题,该增强器在将通道注册到 HeaderChannelRegistry
后用名称替换这些头部。此外,请考虑当您按以下方式配置消息流时会发生什么:网关 → 队列通道(由持久化消息存储支持)→ 服务激活器。该网关会创建一个临时回复通道,当服务激活器的轮询器从队列读取时,该通道就会丢失。同样,您可以使用头部增强器用 String
表示形式替换头部。有关更多信息,请参阅 头部增强器。
Spring Integration 4.0 引入了两个新接口:
-
ChannelMessageStore
:用于实现QueueChannel
实例的特定操作 -
PriorityCapableChannelMessageStore
:用于标记MessageStore
实现以用于PriorityChannel
实例,并为持久化消息提供优先级顺序。
实际行为取决于实现。该框架提供了以下实现,它们可以用作 QueueChannel
和 PriorityChannel
的持久化 MessageStore
:
SimpleMessageStore
的注意事项从 4.1 版本开始,SimpleMessageStore
在调用 getMessageGroup()
时不再复制消息组。对于大型消息组,这曾是一个显著的性能问题。4.0.1 引入了一个布尔型 copyOnGet
属性,允许您控制此行为。当由聚合器内部使用时,此属性被设置为 false
以提高性能。现在它默认为 false
。现在,在聚合器等组件之外访问组存储的用户将获得聚合器正在使用的组的直接引用,而不是副本。在聚合器之外操作组可能会导致不可预测的结果。因此,您应该避免此类操作或将 copyOnGet
属性设置为 true
。
使用 MessageGroupFactory
从 4.3 版本开始,一些 MessageGroupStore
实现可以注入自定义的 MessageGroupFactory
策略,以创建和自定义 MessageGroupStore
使用的 MessageGroup
实例。这默认为 SimpleMessageGroupFactory
,它根据 GroupType.HASH_SET
(LinkedHashSet
) 内部集合生成 SimpleMessageGroup
实例。其他可能的选项是 SYNCHRONISED_SET
和 BLOCKING_QUEUE
,其中最后一个可用于恢复以前的 SimpleMessageGroup
行为。此外,PERSISTENT
选项也可用。有关更多信息,请参阅下一节。从 5.0.1 版本开始,LIST
选项也可用,当组中消息的顺序和唯一性无关紧要时。
持久化 MessageGroupStore
和延迟加载
从 4.3 版本开始,所有持久化 MessageGroupStore
实例都以延迟加载的方式从存储中检索 MessageGroup
实例及其 messages
。在大多数情况下,这对关联 MessageHandler
实例(请参阅 聚合器 和 重排序器)很有用,因为每次关联操作时从存储中加载整个 MessageGroup
会增加开销。
您可以使用 AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
选项从配置中关闭延迟加载行为。
我们对 MongoDB MessageStore
(MongoDB 消息存储) 和 <aggregator>
(聚合器) 上的延迟加载的性能测试使用了类似于以下的自定义 release-strategy
:
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
它为 1000 条简单消息生成了类似以下的结果:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
然而,从 5.5 版本开始,所有持久化 MessageGroupStore
实现都提供了一个基于目标数据库流 API 的 streamMessagesForGroup(Object groupId)
契约。这在组在存储中非常大时提高了资源利用率。在框架内部,这个新的 API 在 延迟器(例如)中被使用,当它在启动时重新调度持久化消息时。返回的 Stream<Message<?>>
必须在处理结束时关闭,例如通过 try-with-resources
自动关闭。无论何时使用 PersistentMessageGroup
,其 streamMessages()
都委托给 MessageGroupStore.streamMessagesForGroup()
。
消息组条件
从 5.5 版本开始,MessageGroup
抽象提供了一个 condition
字符串选项。此选项的值可以是任何可以在以后出于任何原因进行解析以对组做出决定的内容。例如,来自 关联消息处理器 的 ReleaseStrategy
可能会查询组的此属性,而不是迭代组中的所有消息。MessageGroupStore
公开了一个 setGroupCondition(Object groupId, String condition)
API。为此,AbstractCorrelatingMessageHandler
中添加了一个 setGroupConditionSupplier(BiFunction<Message<?>, String, String>)
选项。此函数在每条消息添加到组后以及组的现有条件上进行评估。实现可以决定返回一个新值、现有值或将目标条件重置为 null
。condition
的值可以是 JSON、SpEL 表达式、数字或任何可以序列化为字符串并在之后解析的内容。例如,来自 文件聚合器 组件的 FileMarkerReleaseStrategy
将 FileHeaders.LINE_COUNT
头部的值从 FileSplitter.FileMarker.Mark.END
消息填充到组的条件中,并在其 canRelease()
中查询它,将组大小与此条件中的值进行比较。这样,它就不必迭代组中的所有消息来查找带有 FileHeaders.LINE_COUNT
头部的 FileSplitter.FileMarker.Mark.END
消息。它还允许结束标记在所有其他记录之前到达聚合器;例如,在多线程环境中处理文件时。
此外,为了方便配置,引入了 GroupConditionProvider
契约。AbstractCorrelatingMessageHandler
检查提供的 ReleaseStrategy
是否实现了此接口,并提取一个 conditionSupplier
用于组条件评估逻辑。
使用 LockRegistry
从 6.5 版本开始,AbstractMessageGroupStore
抽象通过锁操作消息组的元数据。此锁获取 groupId 并由 LockRegister
生成。其目的是操作消息和消息组的原子性。在多线程中,同时添加或删除消息或更新元数据时,如果缺少锁,某些实现可能会出现消息组错误。默认情况下,使用 DefaultLockRegistry
,任何 LockRegister
都可以通过 AbstractMessageGroupStore.setLockRegistry()
注入,通常是用于相同持久存储的实现。有关更多信息,请参阅 分布式锁。