事务支持
本章涵盖 Spring Integration 对事务的支持。 它涵盖以下主题:
理解消息流中的事务
Spring Integration 提供了几个钩子来满足消息流的事务需求。 为了更好地理解这些钩子以及如何从中受益,我们必须首先回顾可以用来启动消息流的六种机制,并了解如何在每种机制中处理这些流的事务需求。
以下六种机制启动消息流(每种机制的详细信息在本手册中提供):
-
网关代理:一个基本的消息网关。
-
消息通道:与
MessageChannel
方法的直接交互(例如,channel.send(message)
)。 -
消息发布者:通过在 Spring bean 上调用方法来启动消息流的方式。
-
入站通道适配器和网关:基于将第三方系统与 Spring Integration 消息系统连接来启动消息流的方式(例如,
[JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel
)。 -
调度器:基于预配置调度器分发的调度事件来启动消息流的方式。
-
轮询器:类似于调度器,这是基于预配置轮询器分发的调度或基于间隔的事件来启动消息流的方式。
我们可以将这六种机制分为两大类:
-
用户进程启动的消息流:此类别中的示例场景是调用网关方法或显式将
Message
发送到MessageChannel
。换句话说,这些消息流依赖于第三方进程(例如您编写的一些代码)来启动。 -
守护进程启动的消息流:此类别中的示例场景包括轮询器轮询消息队列以启动带有轮询消息的新消息流,或者调度器通过创建新消息并在预定义时间启动消息流来调度进程。
显然,网关代理、MessageChannel.send(…)
和 MessagePublisher
都属于第一类,而入站适配器和网关、调度器和轮询器属于第二类。
那么,如何在每个类别中的各种场景中处理事务需求,以及 Spring Integration 是否需要为特定场景提供一些明确的事务支持? 或者您可以使用 Spring 的事务支持吗?
Spring 本身提供了对事务管理的一流支持。 因此,我们在这里的目标不是提供新东西,而是使用 Spring 来受益于其现有的事务支持。 换句话说,作为框架,我们必须向 Spring 的事务管理功能公开钩子。 然而,由于 Spring Integration 配置基于 Spring 配置,我们不必总是公开这些钩子,因为 Spring 已经公开了它们。 毕竟,每个 Spring Integration 组件都是一个 Spring Bean。
考虑到这个目标,我们可以再次考虑两种场景:用户进程启动的消息流和守护进程启动的消息流。
由用户进程启动并在 Spring 应用程序上下文中配置的消息流受此类进程的常规事务配置的约束。
因此,它们不需要由 Spring Integration 显式配置来支持事务。
事务可以通过 Spring 的标准事务支持来启动,并且应该如此。
Spring Integration 消息流自然会遵循组件的事务语义,因为它本身由 Spring 配置。
例如,网关或服务激活器方法可以添加 @Transactional
注解,或者在 XML 配置中定义 TransactionInterceptor
,并使用指向应具有事务性的特定方法的切入点表达式。
关键是,在这些场景中,您可以完全控制事务配置和边界。
然而,当涉及到由守护进程启动的消息流时,情况有些不同。 尽管由开发人员配置,但这些流不直接涉及人工或其他进程来启动。 这些是基于触发器的流,由触发器进程(守护进程)根据进程的配置启动。 例如,我们可以让调度器在每个星期五晚上启动一个消息流。 我们还可以配置一个触发器,每秒启动一个消息流,等等。 因此,我们需要一种方法让这些基于触发器的进程知道我们打算使生成的 संदेश流具有事务性,以便在启动新消息流时可以创建事务上下文。 换句话说,我们需要公开一些事务配置,但仅限于委托给 Spring 已经提供的事务支持(就像我们在其他场景中所做的那样)。
轮询器事务支持
Spring Integration 为轮询器提供事务支持。
轮询器是一种特殊类型的组件,因为在轮询器任务中,我们可以对本身具有事务性的资源调用 receive()
,从而将 receive()
调用包含在事务边界内,这允许在任务失败时回滚。
如果我们要为通道添加相同的支持,添加的事务将影响从 send()
调用开始的所有下游组件。
这为事务划分提供了相当宽泛的范围,没有任何强有力的理由,特别是当 Spring 已经提供了几种方法来解决任何下游组件的事务需求时。
然而,receive()
方法被包含在事务边界内是轮询器的“强有力理由”。
任何时候配置轮询器时,您都可以使用 transactional
子元素及其属性提供事务配置,如以下示例所示:
<int:poller max-messages-per-poll="1" fixed-rate="1000">
<transactional transaction-manager="txManager"
isolation="DEFAULT"
propagation="REQUIRED"
read-only="true"
timeout="1000"/>
</int:poller>
上述配置看起来类似于原生的 Spring 事务配置。
您仍然必须提供对事务管理器的引用,并指定事务属性或依赖默认值(例如,如果未指定 'transaction-manager' 属性,则默认为名为 'transactionManager' 的 bean)。
在内部,该过程被封装在 Spring 的原生事务中,其中 TransactionInterceptor
负责处理事务。
有关如何配置事务管理器、事务管理器类型(例如 JTA、Datasource 等)以及与事务配置相关的其他详细信息,请参阅 Spring Framework 参考指南。
通过上述配置,此轮询器启动的所有消息流都具有事务性。 有关轮询器事务配置的更多信息和详细信息,请参阅 轮询和事务。
除了事务,您在运行轮询器时可能还需要解决其他几个横切关注点。
为了帮助解决这个问题,轮询器元素接受 <advice-chain>
子元素,它允许您定义一个自定义的建议实例链以应用于轮询器。
(有关更多详细信息,请参阅 可轮询消息源。)
在 Spring Integration 2.0 中,轮询器经过了重构,现在使用代理机制来解决事务问题以及其他横切关注点。
这项工作产生的一个重大变化是,我们使 <transactional>
和 <advice-chain>
元素互斥。
这样做的理由是,如果您需要多个建议并且其中一个是事务建议,您可以将其包含在 <advice-chain>
中,就像以前一样方便,但控制力更强,因为您现在可以选择按所需顺序定位建议。
以下示例展示了如何执行此操作:
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<advice-chain>
<ref bean="txAdvice"/>
<ref bean="someOtherAdviceBean" />
<beans:bean class="foo.bar.SampleAdvice"/>
</advice-chain>
</int:poller>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
上述示例展示了 Spring 事务建议 (txAdvice
) 的基本基于 XML 的配置,并将其包含在轮询器定义的 <advice-chain>
中。
如果您只需要解决轮询器的事务问题,仍然可以使用 <transactional>
元素以方便。
事务边界
另一个重要因素是消息流中事务的边界。 当事务启动时,事务上下文绑定到当前线程。 因此,无论您的消息流中有多少个端点和通道,只要您确保流在同一线程上继续,您的事务上下文就会被保留。 一旦您通过引入 可轮询通道 或 执行器通道 或在某个服务中手动启动新线程来中断它,事务边界也将被打破。 本质上,事务将在此处结束,如果线程之间发生了成功的移交,则流将被视为成功,并发送 COMMIT 信号,即使流将继续并且可能仍然导致下游某个地方出现异常。 如果这样的流是同步的,该异常可以抛回给消息流的启动器,该启动器也是事务上下文的启动器,事务将导致 ROLLBACK。 中间方法是在线程边界被打破的任何点使用事务通道。 例如,您可以使用委托给事务性 MessageStore 策略的基于队列的通道,或者您可以使用基于 JMS 的通道。
事务同步
在某些环境中,将操作与包含整个流的事务同步会很有帮助。
例如,考虑一个在流开始时执行大量数据库更新的 <file:inbound-channel-adapter/>
。
如果事务提交,我们可能希望将文件移动到 success
目录,而如果事务回滚,我们可能希望将其移动到 failure
目录。
Spring Integration 2.2 引入了将这些操作与事务同步的能力。
此外,如果您没有“真实”事务但仍希望在成功或失败时执行不同的操作,则可以配置 PseudoTransactionManager
。
有关更多信息,请参阅 pseudo-transactions。
以下列表显示了此功能的关键策略接口:
public interface TransactionSynchronizationFactory {
TransactionSynchronization create(Object key);
}
public interface TransactionSynchronizationProcessor {
void processBeforeCommit(IntegrationResourceHolder holder);
void processAfterCommit(IntegrationResourceHolder holder);
void processAfterRollback(IntegrationResourceHolder holder);
}
工厂负责创建 TransactionSynchronization
对象。
您可以实现自己的,也可以使用框架提供的:DefaultTransactionSynchronizationFactory
。
此实现返回一个 TransactionSynchronization
,它委托给 TransactionSynchronizationProcessor
的默认实现:ExpressionEvaluatingTransactionSynchronizationProcessor
。
此处理器支持三个 SpEL 表达式:beforeCommitExpression
、afterCommitExpression
和 afterRollbackExpression
。
对于熟悉事务的人来说,这些操作应该是不言自明的。
在每种情况下,#root
变量都是原始 Message
。
在某些情况下,还会提供其他 SpEL 变量,具体取决于轮询器轮询的 MessageSource
。
例如,MongoDbMessageSource
提供 #mongoTemplate
变量,该变量引用消息源的 MongoTemplate
。
同样,RedisStoreMessageSource
提供 #store
变量,该变量引用轮询创建的 RedisStore
。
要为特定轮询器启用该功能,您可以通过使用 synchronization-factory
属性在轮询器的 <transactional/>
元素上提供对 TransactionSynchronizationFactory
的引用。
从 5.0 版本开始,Spring Integration 提供了 PassThroughTransactionSynchronizationFactory
,当未配置 TransactionSynchronizationFactory
但在建议链中存在 TransactionInterceptor
类型的建议时,此工厂默认应用于轮询端点。
当使用任何开箱即用的 TransactionSynchronizationFactory
实现时,轮询端点会将轮询的消息绑定到当前事务上下文,如果事务建议之后抛出异常,则在 MessagingException
中将其作为 failedMessage
提供。
当使用不实现 TransactionInterceptor
的自定义事务建议时,您可以显式配置 PassThroughTransactionSynchronizationFactory
以实现此行为。
在任何一种情况下,MessagingException
都成为发送到 errorChannel
的 ErrorMessage
的有效负载,并且原因是建议抛出的原始异常。
以前,ErrorMessage
的有效负载是建议抛出的原始异常,并且不提供对 failedMessage
信息的引用,这使得难以确定事务提交问题的原因。
为了简化这些组件的配置,Spring Integration 为默认工厂提供了命名空间支持。 以下示例展示了如何使用命名空间配置文件入站通道适配器:
<int-file:inbound-channel-adapter id="inputDirPoller"
channel="someChannel"
directory="/foo/bar"
filter="filter"
comparator="testComparator">
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
</int:poller>
</int-file:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
channel="committedChannel" />
<int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
SpEL 评估的结果将作为有效负载发送到 committedChannel
或 rolledBackChannel
(在这种情况下,这将是 Boolean.TRUE
或 Boolean.FALSE
——java.io.File.renameTo()
方法调用的结果)。
如果您希望发送整个有效负载以进行进一步的 Spring Integration 处理,请使用 'payload' 表达式。
重要的是要理解,这会将操作与事务同步。
它不会使本身不具有事务性的资源真正具有事务性。
相反,事务(无论是 JDBC 还是其他)在轮询之前启动,并在流完成时提交或回滚,然后执行同步操作。
如果您提供自定义 TransactionSynchronizationFactory
,它负责创建资源同步,导致事务完成时自动解除绑定绑定的资源。
默认的 TransactionSynchronizationFactory
通过返回 ResourceHolderSynchronization
的子类来实现这一点,其中默认的 shouldUnbindAtCompletion()
返回 true
。
除了 after-commit
和 after-rollback
表达式之外,还支持 before-commit
。
在这种情况下,如果评估(或下游处理)抛出异常,则事务将回滚而不是提交。
伪事务
阅读 transaction-synchronization 部分后,您可能会认为在流完成时执行这些“成功”或“失败”操作会很有用,即使轮询器下游没有“真实”的事务性资源(例如 JDBC)。
例如,考虑一个 <file:inbound-channel-adapter/>
,后面跟着一个 <ftp:outbout-channel-adapter/>
。
这些组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败将输入文件移动到不同的目录。
为了提供此功能,框架提供了 PseudoTransactionManager
,即使没有涉及真正的事务性资源,也可以实现上述配置。
如果流正常完成,则调用 beforeCommit
和 afterCommit
同步。
如果失败,则调用 afterRollback
同步。
因为它不是真正的事务,所以不会发生实际的提交或回滚。
伪事务是用于启用同步功能的工具。
要使用 PseudoTransactionManager
,您可以将其定义为 <bean/>
,就像配置真正的事务管理器一样。
以下示例展示了如何执行此操作:
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />