事务

Spring Rabbit 框架支持在同步和异步用例中进行自动事务管理,并提供了多种不同的语义,这些语义可以像 Spring 事务的现有用户所熟悉的那样以声明方式选择。这使得许多(如果不是大多数)常见的消息传递模式易于实现。有两种方法可以向框架发出所需的事务语义信号。在 RabbitTemplateSimpleMessageListenerContainer 中,都有一个 channelTransacted 标志,如果为 true,则告诉框架使用事务性通道,并以提交或回滚(取决于结果)结束所有操作(发送或接收),异常表示回滚。另一个信号是提供一个外部事务,其中包含 Spring 的 PlatformTransactionManager 实现作为正在进行的操作的上下文。如果框架在发送或接收消息时已经有一个事务正在进行,并且 channelTransacted 标志为 true,则消息传递事务的提交或回滚将推迟到当前事务结束。如果 channelTransacted 标志为 false,则没有事务语义应用于消息传递操作(它是自动确认的)。channelTransacted 标志是一个配置时设置。它在创建 AMQP 组件时(通常在应用程序启动时)声明和处理一次。外部事务原则上更具动态性,因为系统在运行时响应当前线程状态。然而,在实践中,当事务以声明方式分层到应用程序上时,它也常常是一个配置设置。对于 RabbitTemplate 的同步用例,外部事务由调用者提供,可以根据喜好以声明方式或命令方式(通常的 Spring 事务模型)提供。以下示例显示了一种声明式方法(通常首选,因为它是非侵入性的),其中模板已配置为 channelTransacted=true

@Transactional
public void doSomething() {
    String incoming = rabbitTemplate.receiveAndConvert();
    // do some more database processing...
    String outgoing = processInDatabaseAndExtractReply(incoming);
    rabbitTemplate.convertAndSend(outgoing);
}

在前面的示例中,一个 String 有效负载在标记为 @Transactional 的方法内部被接收、转换并作为消息体发送。如果数据库处理因异常而失败,则传入消息将返回给代理,并且传出消息不会发送。这适用于 RabbitTemplate 在事务方法链中的任何操作(除非,例如,Channel 被直接操作以提前提交事务)。对于 SimpleMessageListenerContainer 的异步用例,如果需要外部事务,则必须由容器在设置侦听器时请求。为了表示需要外部事务,用户在配置容器时向容器提供 PlatformTransactionManager 的实现。以下示例显示了如何执行此操作:

@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(transactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

}

在前面的示例中,事务管理器作为从另一个 bean 定义(未显示)注入的依赖项添加,并且 channelTransacted 标志也设置为 true。其效果是,如果侦听器因异常而失败,则事务将回滚,并且消息也将返回给代理。重要的是,如果事务未能提交(例如,由于 数据库约束错误或连接问题),AMQP 事务也将回滚,并且消息将返回给代理。这有时被称为“尽力而为的 1 阶段提交”,并且是可靠消息传递的一个非常强大的模式。如果前面的示例中 channelTransacted 标志设置为 false(默认值),则侦听器仍将提供外部事务,但所有消息传递操作都将自动确认,因此其效果是在业务操作回滚时也提交消息传递操作。

有条件回滚

在 1.6.6 版本之前,当使用外部事务管理器(例如 JDBC)时,向容器的 transactionAttribute 添加回滚规则无效。异常总是回滚事务。

此外,当在容器的建议链中使用 {spring-framework-docs}/data-access/transaction/declarative.html[事务建议]时,有条件回滚并不是很有用,因为所有侦听器异常都被包装在 ListenerExecutionFailedException 中。

第一个问题已得到纠正,规则现在已正确应用。此外,现在提供了 ListenerFailedRuleBasedTransactionAttribute。它是 RuleBasedTransactionAttribute 的子类,唯一的区别是它知道 ListenerExecutionFailedException 并使用此类异常的原因作为规则。此事务属性可以直接在容器中使用,也可以通过事务建议使用。

以下示例使用此规则:

@Bean
public AbstractMessageListenerContainer container() {
    ...
    container.setTransactionManager(transactionManager);
    RuleBasedTransactionAttribute transactionAttribute =
        new ListenerFailedRuleBasedTransactionAttribute();
    transactionAttribute.setRollbackRules(Collections.singletonList(
        new NoRollbackRuleAttribute(DontRollBackException.class)));
    container.setTransactionAttribute(transactionAttribute);
    ...
}

关于已接收消息回滚的说明

AMQP 事务仅适用于发送到代理的消息和确认。因此,当 Spring 事务回滚并且已接收到消息时,Spring AMQP 不仅必须回滚事务,还必须手动拒绝消息(有点像 nack,但规范不这样称呼它)。消息拒绝时采取的操作与事务无关,并取决于 defaultRequeueRejected 属性(默认值:true)。有关拒绝失败消息的更多信息,请参阅 消息侦听器和异步情况

有关 RabbitMQ 事务及其限制的更多信息,请参阅 RabbitMQ 代理语义

在 RabbitMQ 2.7.0 之前,此类消息(以及通道关闭或中止时任何未确认的消息)在 Rabbit 代理上会回到队列的末尾。自 2.7.0 起,被拒绝的消息会回到队列的开头,类似于 JMS 回滚的消息。

以前,事务回滚时消息重新入队在本地事务和提供 TransactionManager 时不一致。在前一种情况下,应用正常的重新入队逻辑(AmqpRejectAndDontRequeueExceptiondefaultRequeueRejected=false)(参见 消息侦听器和异步情况)。使用事务管理器时,消息在回滚时无条件重新入队。从 2.0 版本开始,行为一致,并且在两种情况下都应用正常的重新入队逻辑。要恢复到以前的行为,您可以将容器的 alwaysRequeueWithTxManagerRollback 属性设置为 true。参见 消息侦听器容器配置

使用 RabbitTransactionManager

{spring-amqp-java-docs}/rabbit/transaction/RabbitTransactionManager.html[RabbitTransactionManager] 是在外部事务中执行 Rabbit 操作并与之同步的替代方案。此事务管理器是 {spring-framework-java-docs}/transaction/PlatformTransactionManager.html[PlatformTransactionManager] 接口的实现,应与单个 Rabbit ConnectionFactory 一起使用。

此策略无法提供 XA 事务,例如,为了在消息传递和数据库访问之间共享事务。

应用程序代码需要通过 ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean) 检索事务性 Rabbit 资源,而不是通过标准的 Connection.createChannel() 调用以及随后的通道创建。当使用 Spring AMQP 的 {spring-amqp-java-docs}/rabbit/core/RabbitTemplate.html[RabbitTemplate] 时,它将自动检测线程绑定的通道并自动参与其事务。

通过 Java 配置,您可以使用以下 bean 设置新的 RabbitTransactionManager:

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
    return new RabbitTransactionManager(connectionFactory);
}

如果您更喜欢 XML 配置,可以在 XML 应用程序上下文文件中声明以下 bean:

<bean id="rabbitTxManager"
      class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

事务同步

将 RabbitMQ 事务与某些其他(例如 DBMS)事务同步提供“尽力而为的单阶段提交”语义。在事务同步的完成阶段之后,RabbitMQ 事务可能会提交失败。这会被 spring-tx 基础设施记录为错误,但不会向调用代码抛出异常。从 2.3.10 版本开始,您可以在事务在处理事务的同一线程上提交后调用 ConnectionUtils.checkAfterCompletion()。如果没有发生异常,它将简单地返回;否则它将抛出 AfterCompletionFailedException,该异常将具有表示完成的同步状态的属性。

通过调用 ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true) 来启用此功能;这是一个全局标志,适用于所有线程。