Transactions
Spring Rabbit 框架支持在具有多种不同语义的同步和异步用例中自动事务管理,这些语义可以通过声明性方式进行选择,这对于 Spring 事务的现有用户来说很熟悉。这让许多甚至大多数常见的消息传递模式得以轻松实现。
The Spring Rabbit framework has support for automatic transaction management in the synchronous and asynchronous use cases with a number of different semantics that can be selected declaratively, as is familiar to existing users of Spring transactions. This makes many if not most common messaging patterns easy to implement.
有两种方法可以向框架指示所需的事务语义。在 RabbitTemplate
和 SimpleMessageListenerContainer
中,都有一个标志位 channelTransacted
,如果为 true
,则会指示框架使用事务通道,并根据结果以提交或回滚操作(发送或接收)的结尾,其中异常指示回滚。另一个指示是使用某个 Spring PlatformTransactionManager
实现来提供外部事务作为正在进行操作的上下文。如果框架在发送或接收消息时已有事务在进行,且 channelTransacted
标志位为 true
,则会将消息传递事务的提交或回滚操作推迟到当前事务结束时。如果 channelTransacted
标志位为 false
,则不会将任何事务语义应用到消息传递操作(会自动确认)。
There are two ways to signal the desired transaction semantics to the framework.
In both the RabbitTemplate
and SimpleMessageListenerContainer
, there is a flag channelTransacted
which, if true
, tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback (depending on the outcome), with an exception signaling a rollback.
Another signal is to provide an external transaction with one of Spring’s PlatformTransactionManager
implementations as a context for the ongoing operation.
If there is already a transaction in progress when the framework is sending or receiving a message, and the channelTransacted
flag is true
, the commit or rollback of the messaging transaction is deferred until the end of the current transaction.
If the channelTransacted
flag is false
, no transaction semantics apply to the messaging operation (it is auto-acked).
channelTransacted
标志位是配置时间设置。它在创建 AMQP 组件时会声明和处理一次,通常在应用程序启动时。从原则上讲,外部事务更动态,因为系统会在运行时响应当前线程状态。然而,在实践中,当事务以声明性方式分层到应用程序时,它通常也是一个配置设置。
The channelTransacted
flag is a configuration time setting.
It is declared and processed once when the AMQP components are created, usually at application startup.
The external transaction is more dynamic in principle because the system responds to the current thread state at runtime.
However, in practice, it is often also a configuration setting, when the transactions are layered onto an application declaratively.
针对 RabbitTemplate
使用同步用例时,外部事务由调用者提供,可以根据喜好以声明性方式或命令性方式提供(常见的 Spring 事务模型)。以下示例展示了声明性方式(通常较受青睐,因为它是非侵入性的),其中该模板已配置为 channelTransacted=true
:
For synchronous use cases with RabbitTemplate
, the external transaction is provided by the caller, either declaratively or imperatively according to taste (the usual Spring transaction model).
The following example shows a declarative approach (usually preferred because it is non-invasive), where the template has been configured with channelTransacted=true
:
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
在上一个示例中,String
负载会被接收、转换,并以消息正文的形式发送到一个标记为 @Transactional
的方法内部。如果数据库处理过程出现异常,入站消息会被返回到代理,且不会发送出站消息。这适用于事务方法链中的任何操作(RabbitTemplate
,除非,例如,Channel
被直接处理为提前提交该事务)。
In the preceding example, a String
payload is received, converted, and sent as a message body inside a method marked as @Transactional
.
If the database processing fails with an exception, the incoming message is returned to the broker, and the outgoing message is not sent.
This applies to any operations with the RabbitTemplate
inside a chain of transactional methods (unless, for instance, the Channel
is directly manipulated to commit the transaction early).
针对于 SimpleMessageListenerContainer
使用异步用例时,如果需要外部事务,则务必在设置侦听器时由容器请求该事务。为了指示需要外部事务,用户会在配置时向容器提供 PlatformTransactionManager
实现。以下示例展示了如何执行此操作:
For asynchronous use cases with SimpleMessageListenerContainer
, if an external transaction is needed, it has to be requested by the container when it sets up the listener.
To signal that an external transaction is required, the user provides an implementation of PlatformTransactionManager
to the container when it is configured.
The following example shows how to do so:
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
在上一个示例中,事务管理器被添加为从其他 Bean 定义(未显示)注入的依赖,并且还将 channelTransacted
标志位设置为 true
。其效果是,如果侦听器出现异常,该事务会被回滚,并且消息也会返回到代理。重要的是,如果事务提交失败(例如,由于数据库约束错误或连接问题),则 AMQP 事务也会回滚,并且该消息会被返回到代理。这有时被称为“尽量而为 1 阶段提交
”,并且是用于进行可靠消息传递的一种非常有效的方法。如果在上一个示例中将 channelTransacted
标志位设置为 false
(默认值),则仍然会为监听器提供外部事务,但所有消息传递操作都将自动确认,因此其效果是即使业务操作回滚,也会提交消息传递操作。
In the preceding example, the transaction manager is added as a dependency injected from another bean definition (not shown), and the channelTransacted
flag is also set to true
.
The effect is that if the listener fails with an exception, the transaction is rolled back, and the message is also returned to the broker.
Significantly, if the transaction fails to commit (for example, because of
a database constraint error or connectivity problem), the AMQP transaction is also rolled back, and the message is returned to the broker.
This is sometimes known as a “Best Efforts 1 Phase Commit”, and is a very powerful pattern for reliable messaging.
If the channelTransacted
flag was set to false
(the default) in the preceding example, the external transaction would still be provided for the listener, but all messaging operations would be auto-acked, so the effect is to commit the messaging operations even on a rollback of the business operation.
Conditional Rollback
在版本 1.6.6 之前,当使用外部事务管理器(例如 JDBC)时,向容器的 transactionAttribute
添加回滚规则不会产生任何效果。异常始终会回滚事务。
Prior to version 1.6.6, adding a rollback rule to a container’s transactionAttribute
when using an external transaction manager (such as JDBC) had no effect.
Exceptions always rolled back the transaction.
此外,在容器的建议链中使用 transaction advice 时,有条件回滚不太有用,因为所有侦听器异常都包含在 ListenerExecutionFailedException
中。
Also, when using a transaction advice in the container’s advice chain, conditional rollback was not very useful, because all listener exceptions are wrapped in a ListenerExecutionFailedException
.
第一个问题已得到修正,现在可以正常地应用这些规则。此外,现在提供了 ListenerFailedRuleBasedTransactionAttribute
。它是 RuleBasedTransactionAttribute
的一个子类,唯一的区别在于它感知 ListenerExecutionFailedException
,并使用此类异常的原因应用规则。此事务属性可以直接在容器中使用,或通过事务忠告使用。
The first problem has been corrected, and the rules are now applied properly.
Further, the ListenerFailedRuleBasedTransactionAttribute
is now provided.
It is a subclass of RuleBasedTransactionAttribute
, with the only difference being that it is aware of the ListenerExecutionFailedException
and uses the cause of such exceptions for the rule.
This transaction attribute can be used directly in the container or through a transaction advice.
以下示例使用此规则:
The following example uses this rule:
@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}
A note on Rollback of Received Messages
AMQP 事务仅适用于发送到代理的消息和确认。因此,当 Spring 事务回滚且已收到消息时,Spring AMQP 不仅要回滚事务,还必须手动拒绝消息(类似 nack,但这并不是规范称之为的内容)。对消息拒绝采取的操作与事务无关,而取决于 defaultRequeueRejected
属性(默认值:true
)。有关拒绝失败消息的更多信息,请参阅 Message Listeners and the Asynchronous Case。
AMQP transactions apply only to messages and acks sent to the broker.
Consequently, when there is a rollback of a Spring transaction and a message has been received, Spring AMQP has to not only rollback the transaction but also manually reject the message (sort of a nack, but that is not what the specification calls it).
The action taken on message rejection is independent of transactions and depends on the defaultRequeueRejected
property (default: true
).
For more information about rejecting failed messages, see Message Listeners and the Asynchronous Case.
有关 RabbitMQ 事务及其限制的详细信息,请参阅 RabbitMQ Broker Semantics。
For more information about RabbitMQ transactions and their limitations, see RabbitMQ Broker Semantics.
在 RabbitMQ 2.7.0 之前,此类消息(以及在信道关闭或中止时未确认的消息)会在 Rabbit 代理上进入队列的末尾。自 2.7.0 起,被拒绝的消息会进入队列的开头,类似于 JMS 回滚的消息。 |
Prior to RabbitMQ 2.7.0, such messages (and any that are unacked when a channel is closed or aborts) went to the back of the queue on a Rabbit broker. Since 2.7.0, rejected messages go to the front of the queue, in a similar manner to JMS rolled back messages. |
以前,事务回滚时的消息重新排队在本地事务和提供 |
Previously, message requeue on transaction rollback was inconsistent between local transactions and when a |
Using RabbitTransactionManager
RabbitTransactionManager 是在外部事务中执行 Rabbit 操作并与其同步的替代方案。此事务管理器是 PlatformTransactionManager
接口的实现,并且应与单个 Rabbit ConnectionFactory
一起使用。
The RabbitTransactionManager is an alternative to executing Rabbit operations within, and synchronized with, external transactions.
This transaction manager is an implementation of the PlatformTransactionManager
interface and should be used with a single Rabbit ConnectionFactory
.
此策略无法提供 XA 事务——例如,无法在消息传递和数据库访问之间共享事务。
This strategy is not able to provide XA transactions — for example, in order to share transactions between messaging and database access.
使用 ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)
应用程序代码需要检索事务性 Rabbit 资源,而不是使用 Connection.createChannel()
标准调用和后续信道创建。在使用 Spring AMQP 的 RabbitTemplate 时,它将自动检测线程绑定信道并自动参与其中事务。
Application code is required to retrieve the transactional Rabbit resources through ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)
instead of a standard Connection.createChannel()
call with subsequent channel creation.
When using Spring AMQP’s RabbitTemplate, it will autodetect a thread-bound Channel and automatically participate in its transaction.
借助 Java 配置,你可以使用以下 Bean 来设置一个新的 RabbitTransactionManager:
With Java Configuration, you can setup a new RabbitTransactionManager by using the following bean:
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}
如果你更喜欢 XML 配置,可以在 XML 应用程序上下文文件中声明以下 Bean:
If you prefer XML configuration, you can declare the following bean in your XML Application Context file:
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
Transaction Synchronization
将 RabbitMQ 事务与其他某些事务(例如 DBMS 事务)同步提供“尽力而为的一阶段提交”语义。RabbitMQ 事务在事务同步的完成后的阶段中可能无法提交。这是由 spring-tx
基础结构记录为一个错误,但没有将异常抛转给调用代码。从版本 2.3.10 开始,你可以在事务在处理了事务的同一线程上提交后,调用 ConnectionUtils.checkAfterCompletion()
。如果没有发生异常,它将简单地返回;否则它将抛出一个 AfterCompletionFailedException
,其中将有一个属性代表完成的同步状态。
Synchronizing a RabbitMQ transaction with some other (e.g. DBMS) transaction provides "Best Effort One Phase Commit" semantics.
It is possible that the RabbitMQ transaction fails to commit during the after completion phase of transaction synchronization.
This is logged by the spring-tx
infrastructure as an error, but no exception is thrown to the calling code.
Starting with version 2.3.10, you can call ConnectionUtils.checkAfterCompletion()
after the transaction has committed on the same thread that processed the transaction.
It will simply return if no exception occurred; otherwise it will throw an AfterCompletionFailedException
which will have a property representing the synchronization status of the completion.
通过调用 ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)
来启用此功能;这是一个全局标志,适用于所有线程。
Enable this feature by calling ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)
; this is a global flag and applies to all threads.