Message Store
Enterprise Integration Patterns (EIP)书标识了几种模式,这些模式能够缓冲邮件。例如,聚合器缓冲邮件,直到其可以被释放,而 QueueChannel
缓冲邮件,直到使用者明确从该通道接收这些邮件。由于可能在邮件流的任何时刻发生故障,因此缓冲邮件的 EIP 组件还引入了一个邮件可能丢失的点。
The Enterprise Integration Patterns (EIP) book identifies several patterns that have the ability to buffer messages.
For example, an aggregator buffers messages until they can be released, and a QueueChannel
buffers messages until consumers explicitly receive those messages from that channel.
Because of the failures that can occur at any point within your message flow, EIP components that buffer messages also introduce a point where messages could be lost.
为了降低丢失邮件的风险,EIP 定义了 message store 模式,该模式允许 EIP 组件存储邮件,通常在某种类型的持久性存储中(例如 RDBMS)。
To mitigate the risk of losing messages, EIP defines the message store pattern, which lets EIP components store messages, typically in some type of persistent store (such as an RDBMS).
Spring 集成通过以下方式为消息存储模式提供支持:
Spring Integration provides support for the message store pattern by:
-
Defining an
org.springframework.integration.store.MessageStore
strategy interface -
Providing several implementations of this interface
-
Exposing a
message-store
attribute on all components that have the capability to buffer messages so that you can inject any instance that implements theMessageStore
interface.
有关如何配置特定邮件存储实现以及如何将 MessageStore
实现注入到特定缓冲组件中的详细信息,将在整个手册中进行介绍(请参见特定组件,例如 QueueChannel、Aggregator、Delayer、等等)。以下两个示例演示了如何为 QueueChannel
和聚合器添加对邮件存储的引用:
Details on how to configure a specific message store implementation and how to inject a MessageStore
implementation into a specific buffering component are described throughout the manual (see the specific component, such as QueueChannel, Aggregator, Delayer, and others).
The following pair of examples show how to add a reference to a message store for a QueueChannel
and for an aggregator:
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator message-store="refToMessageStore"/>
默认情况下,消息使用 o.s.i.store.SimpleMessageStore
(MessageStore
的实现)存储在内存中。对于开发或简单的小容量环境,这可能很合适,因为在这些环境中,非持久消息的潜在丢失并不是一个问题。然而,典型的生产应用程序需要一个更强大的选项,不仅可以降低消息丢失的风险,还可以避免潜在的内存不足错误。因此,我们还为各种数据存储提供了 MessageStore
实现。以下是受支持的实现的完整列表:
By default, messages are stored in-memory by using o.s.i.store.SimpleMessageStore
, an implementation of MessageStore
.
That might be fine for development or simple low-volume environments where the potential loss of non-persistent messages is not a concern.
However, the typical production application needs a more robust option, not only to mitigate the risk of message loss but also to avoid potential out-of-memory errors.
Therefore, we also provide MessageStore
implementations for a variety of data-stores.
The following is a complete list of supported implementations:
-
Hazelcast Message Store: Uses a Hazelcast distributed cache to store messages
-
JDBC Message Store: Uses an RDBMS to store messages
-
Redis Message Store: Uses a Redis key/value datastore to store messages
-
MongoDB Message Store: Uses a MongoDB document store to store messages
但是,在使用 MessageStore
的持久实现时要注意一些限制。
However, be aware of some limitations while using persistent implementations of the MessageStore
.
消息数据(有效负载和标头)使用不同的序列化策略进行序列化和反序列化,具体取决于 MessageStore
的实现。例如,在使用 JdbcMessageStore
时,默认情况下只会保留 Serializable
数据。在这种情况下,在发生序列化之前,会删除非 Serializable
标头。此外,请注意由传输适配器注入的特定于协议的标头(例如 FTP、HTTP、JMS 等)。例如,<http:inbound-channel-adapter/>
将 HTTP 标头映射到消息标头,其中一个标头是非序列化的 org.springframework.http.MediaType
实例的 ArrayList
。但是,你可以将自己的 Serializer
和 Deserializer
策略接口实现注入到某些 MessageStore
实现(例如 JdbcMessageStore
)中,以更改序列化和反序列化的行为。
The Message data (payload and headers) is serialized and deserialized by using different serialization strategies, depending on the implementation of the MessageStore
.
For example, when using JdbcMessageStore
, only Serializable
data is persisted by default.
In this case, non-Serializable headers are removed before serialization occurs.
Also, be aware of the protocol-specific headers that are injected by transport adapters (such as FTP, HTTP, JMS, and others).
For example, <http:inbound-channel-adapter/>
maps HTTP headers into message headers, and one of them is an ArrayList
of non-serializable org.springframework.http.MediaType
instances.
However, you can inject your own implementation of the Serializer
and Deserializer
strategy interfaces into some MessageStore
implementations (such as JdbcMessageStore
) to change the behavior of serialization and deserialization.
特别注意表示特定类型数据的标头。例如,如果其中一个标头包含某个 Spring bean 的实例,那么在反序列化时,你可能会得到该 bean 的另一个实例,这直接影响框架创建的某些隐式标头(例如 REPLY_CHANNEL
或 ERROR_CHANNEL
)。目前,它们不可序列化,但即使它们可序列化,反序列化的通道也不会表示预期的实例。
Pay special attention to the headers that represent certain types of data.
For example, if one of the headers contains an instance of some Spring bean, upon deserialization, you may end up with a different instance of that bean, which directly affects some of the implicit headers created by the framework (such as REPLY_CHANNEL
or ERROR_CHANNEL
).
Currently, they are not serializable, but, even if they were, the deserialized channel would not represent the expected instance.
从 Spring Integration 版本 3.0 开始,你可以通过将通道注册到 HeaderChannelRegistry
来配置标头富集器,用名称替换这些标头,以解决此问题。
Beginning with Spring Integration version 3.0, you can resolve this issue with a header enricher configured to replace these headers with a name after registering the channel with the HeaderChannelRegistry
.
此外,考虑一下当按照如下方式配置消息流时会发生什么:网关 → 队列通道(受持久消息存储支持)→ 服务激活器。该网关创建一个临时回复通道,该通道在服务激活器的轮询程序从队列中读取时丢失。同样,你可以使用标头富集器将标头替换为 String
表示形式。
Also, consider what happens when you configure a message-flow as follows: gateway → queue-channel (backed by a persistent Message Store) → service-activator.
That gateway creates a temporary reply channel, which is lost by the time the service-activator’s poller reads from the queue.
Again, you can use the header enricher to replace the headers with a String
representation.
有关更多信息,请参阅 Header Enricher。
For more information, see Header Enricher.
Spring Integration 4.0 引入了两个新接口:
Spring Integration 4.0 introduced two new interfaces:
-
ChannelMessageStore
: To implement operations specific forQueueChannel
instances -
PriorityCapableChannelMessageStore
: To markMessageStore
implementations to be used forPriorityChannel
instances and to provide priority order for persisted messages.
实际行为取决于实现。框架提供以下实现,可用作 QueueChannel
和 PriorityChannel
的持久 MessageStore
:
The real behavior depends on the implementation.
The framework provides the following implementations, which can be used as a persistent MessageStore
for QueueChannel
and PriorityChannel
:
SimpleMessageStore
从版本 4.1 开始,SimpleMessageStore
在调用 getMessageGroup()
时不再复制消息组。对于大型消息组,这是一个严重的性能问题。4.0.1 引入了一个布尔值 copyOnGet
属性,允许你控制此行为。当聚合器在内部使用此属性时,该属性被设置为 false
以提高性能。它现在默认情况下为 false
。
Starting with version 4.1, the SimpleMessageStore
no longer copies the message group when calling getMessageGroup()
.
For large message groups, this was a significant performance problem.
4.0.1 introduced a boolean copyOnGet
property that lets you control this behavior.
When used internally by the aggregator, this property was set to false
to improve performance.
It is now false
by default.
在聚合器等组件之外访问组存储的用户现在将获得聚合器正在使用组的直接引用,而不是副本。在聚合器之外对组进行操作可能会导致不可预测的结果。
Users accessing the group store outside of components such as aggregators now get a direct reference to the group being used by the aggregator instead of a copy. Manipulation of the group outside the aggregator may cause unpredictable results.
因此,你应该不执行此类操作或将 copyOnGet
属性设置为 true
。
For this reason, you should either not perform such manipulation or set the copyOnGet
property to true
.
Using MessageGroupFactory
从版本 4.3 开始,可以为某些 MessageGroupStore
实现注入自定义 MessageGroupFactory
策略,以创建和自定义 MessageGroupStore
使用的 MessageGroup
实例。这默认为 SimpleMessageGroupFactory
,它根据 GroupType.HASH_SET
(LinkedHashSet
)内部集合生成 SimpleMessageGroup
实例。其他可能的选项是 SYNCHRONISED_SET
和 BLOCKING_QUEUE
,其中最后一个可用于恢复以前的 SimpleMessageGroup
行为。此外,PERSISTENT
选项可用。有关更多信息,请参阅下一部分。从版本 5.0.1 开始,当组中消息的顺序和唯一性无关紧要时,LIST
选项也可用。
Starting with version 4.3, some MessageGroupStore
implementations can be injected with a custom MessageGroupFactory
strategy to create and customize the MessageGroup
instances used by the MessageGroupStore
.
This defaults to a SimpleMessageGroupFactory
, which produces SimpleMessageGroup
instances based on the GroupType.HASH_SET
(LinkedHashSet
) internal collection.
Other possible options are SYNCHRONISED_SET
and BLOCKING_QUEUE
, where the last one can be used to reinstate the previous SimpleMessageGroup
behavior.
Also, the PERSISTENT
option is available.
See the next section for more information.
Starting with version 5.0.1, the LIST
option is also available for when the order and uniqueness of messages in the group does not matter.
Persistent MessageGroupStore
and Lazy-load
从 4.3 版本开始,所有持久 MessageGroupStore`实例都会以延迟加载的方式从存储中检索 `MessageGroup`实例及其 `messages
。在大多数情况下,对于相关 `MessageHandler`实例(请参阅 Aggregator和 Resequencer),这非常有用,因为这会在每次相关操作中增加从存储中加载整个 `MessageGroup`的开销。
Starting with version 4.3, all persistent MessageGroupStore
instances retrieve MessageGroup
instances and their messages
from the store in the lazy-load manner.
In most cases, it is useful for the correlation MessageHandler
instances (see Aggregator and Resequencer), when it would add overhead to load entire the MessageGroup
from the store on each correlation operation.
您可以使用 AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
选项从配置中关闭延迟加载行为。
You can use the AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
option to switch off the lazy-load behavior from the configuration.
我们在 MongoDB MessageStore
(MongoDB Message Store)和 <aggregator>
(Aggregator)上的延迟加载性能测试使用与以下内容类似的自定义 release-strategy
:
Our performance tests for lazy-load on MongoDB MessageStore
(MongoDB Message Store) and <aggregator>
(Aggregator) use a custom release-strategy
similar to the following:
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
它产生类似于以下结果,用于 1000 条简单消息:
It produces results similar to the following for 1000 simple messages:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
但是,从版本 5.5 开始,所有持久的 MessageGroupStore
实现都会基于目标数据库流式传输 API 提供 streamMessagesForGroup(Object groupId)
合同。当存储中组非常庞大时,这将改善资源利用。框架内部会在此新 API 中使用这 Delayer (例如)在启动时重新安排持久的邮件。返回的 Stream<Message<?>>
必须在处理结束时关闭,例如,通过 try-with-resources
自动关闭。每当使用 PersistentMessageGroup
时,其 streamMessages()
都会委派给 MessageGroupStore.streamMessagesForGroup()
。
However, starting with version 5.5, all the persistent MessageGroupStore
implementations provide a streamMessagesForGroup(Object groupId)
contract based on the target database streaming API.
This improves resources utilization when groups are very big in the store.
Internally in the framework this new API is used in the Delayer (for example) when it reschedules persisted messages on startup.
A returned Stream<Message<?>>
must be closed in the end of processing, e.g. via auto-close by the try-with-resources
.
Whenever a PersistentMessageGroup
is used, its streamMessages()
delegates to the MessageGroupStore.streamMessagesForGroup()
.
Message Group Condition
从 5.5 版本开始,MessageGroup`抽象提供 `condition`字符串选项。此选项的值可以是任何内容,可以稍后解析以任何原因决定该组。例如,来自 correlation message handler的 `ReleaseStrategy`可能从该组咨询此属性,而不是迭代该组中的所有消息。`MessageGroupStore`公开 `setGroupCondition(Object groupId, String condition)`API。为此目的,已将 `setGroupConditionSupplier(BiFunction<Message<?>, String, String>)`选项添加到 `AbstractCorrelatingMessageHandler
。此函数针对添加到组中的每条消息以及组的现有条件进行评估。实现可以决定返回一个新值、现有值或将目标条件重置为 null
。`condition`的值可以是 JSON、SpEL 表达式、数字或任何可以序列化为字符串并稍后解析的内容。例如,来自 File Aggregator组件的 `FileMarkerReleaseStrategy`使用 `FileHeaders.LINE_COUNT`标题从 `FileSplitter.FileMarker.Mark.END`消息中填充条件到组中,并使用上述 `canRelease()`对其进行咨询,将组大小与该条件中的值进行比较。这样,它不必迭代组中的所有消息来查找带有 `FileHeaders.LINE_COUNT`标题的 `FileSplitter.FileMarker.Mark.END`消息。它还允许结束标记在所有其他记录之前到达聚合器;例如,当在多线程环境中处理文件时。
Starting with version 5.5, the MessageGroup
abstraction provides a condition
string option.
The value of this option can be anything that could be parsed later on for any reason to make a decision for the group.
For example a ReleaseStrategy
from a correlation message handler may consult this property from the group instead of iterating all the messages in the group.
The MessageGroupStore
exposes a setGroupCondition(Object groupId, String condition)
API.
For this purpose a setGroupConditionSupplier(BiFunction<Message<?>, String, String>)
option has been added to the AbstractCorrelatingMessageHandler
.
This function is evaluated against each message after it has been added to the group as well as the existing condition of the group.
The implementation may decide to return a new value, the existing value, or reset the target condition to null
.
The value for a condition
can be a JSON, SpEL expression, number or anything what can be serialized as a string and parsed afterward.
For example, the FileMarkerReleaseStrategy
from the File Aggregator component, populates a condition into a group from the FileHeaders.LINE_COUNT
header of the FileSplitter.FileMarker.Mark.END
message and consults with it from its canRelease()
comparing a group size with the value in this condition.
This way it doesn’t iterate all the messages in group to find a FileSplitter.FileMarker.Mark.END
message with the FileHeaders.LINE_COUNT
header.
It also allows the end marker to arrive at the aggregator before all the other records; for example when processing a file in a multi-threaded environment.
此外,为了配置方便,引入了 GroupConditionProvider
协议。AbstractCorrelatingMessageHandler
检查提供的 ReleaseStrategy
是否实现了此接口,并提取一个 conditionSupplier
用于组条件评估逻辑。
In addition, for configuration convenience, a GroupConditionProvider
contract has been introduced.
The AbstractCorrelatingMessageHandler
checks if the provided ReleaseStrategy
implements this interface and extracts a conditionSupplier
for group condition evaluation logic.