弹性:从错误和 Broker 故障中恢复
Spring AMQP 提供的一些关键(也是最受欢迎的)高级功能与协议错误或 Broker 故障时的恢复和自动重新连接有关。
我们已经在本指南中看到了所有相关组件,但在这里将它们整合在一起并单独列出功能和恢复场景会有所帮助。
主要的重新连接功能由 CachingConnectionFactory
本身启用。
使用 RabbitAdmin
自动声明功能也通常很有益。
此外,如果您关心保证交付,您可能还需要在 RabbitTemplate
和 SimpleMessageListenerContainer
中使用 channelTransacted
标志,并在 SimpleMessageListenerContainer
中使用 AcknowledgeMode.AUTO
(如果您自己进行确认,则为手动)。
交换机、队列和绑定的自动声明
RabbitAdmin
组件可以在启动时声明交换机、队列和绑定。
它通过 ConnectionListener
延迟执行此操作。
因此,如果 Broker 在启动时不存在,也无关紧要。
第一次使用 Connection
(例如,
通过发送消息)时,监听器会触发,并且应用管理功能。
在监听器中执行自动声明的另一个好处是,如果连接因任何原因(例如,
Broker 宕机、网络故障等)断开,它们会在连接重新建立时再次应用。
以这种方式声明的队列必须具有固定的名称——无论是显式声明的还是框架为 |
自动声明仅在 CachingConnectionFactory
缓存模式为 CHANNEL
(默认值)时执行。
存在此限制是因为排他队列和自动删除队列绑定到连接。
从 2.2.2 版本开始,RabbitAdmin
将检测 DeclarableCustomizer
类型的 bean,并在实际处理声明之前应用该函数。
例如,这对于在框架中获得一流支持之前设置新参数(属性)非常有用。
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
这在不提供对 Declarable
bean 定义直接访问的项目中也很有用。
另请参阅 RabbitMQ 自动连接/拓扑恢复。
同步操作中的故障和重试选项
如果您在使用 RabbitTemplate
(例如)的同步序列中失去与 Broker 的连接,Spring AMQP 会抛出 AmqpException
(通常但并非总是 AmqpIOException
)。
我们不会试图隐藏问题的事实,因此您必须能够捕获并响应异常。
如果您怀疑连接已丢失(并且不是您的错),最简单的做法是再次尝试操作。
您可以手动执行此操作,或者您可以考虑使用 Spring Retry 来处理重试(命令式或声明式)。
Spring Retry 提供了一些 AOP 拦截器和极大的灵活性来指定重试参数(尝试次数、异常类型、退避算法等)。
Spring AMQP 还提供了一些便利的工厂 bean,用于以方便的形式创建 Spring Retry 拦截器,以用于 AMQP 用例,并带有强类型回调接口,您可以用来实现自定义恢复逻辑。
有关更多详细信息,请参阅 StatefulRetryOperationsInterceptor
和 StatelessRetryOperationsInterceptor
的 Javadoc 和属性。
如果不存在事务,或者事务在重试回调中启动,则无状态重试是合适的。
请注意,无状态重试比有状态重试更容易配置和分析,但如果存在必须回滚或肯定会回滚的正在进行的事务,则通常不适用。
事务中间断开连接应该与回滚具有相同的效果。
因此,对于在堆栈更高层启动事务的重新连接,有状态重试通常是最佳选择。
有状态重试需要一种机制来唯一标识消息。
最简单的方法是让发送方在 MessageId
消息属性中放置一个唯一值。
提供的消息转换器提供了执行此操作的选项:您可以将 createMessageIds
设置为 true
。
否则,您可以将 MessageKeyGenerator
实现注入到拦截器中。
键生成器必须为每条消息返回一个唯一键。
在 2.0 版本之前,提供了 MissingMessageIdAdvice
。
它允许没有 messageId
属性的消息恰好重试一次(忽略重试设置)。
此建议不再提供,因为与 spring-retry
1.2 版本一起,其功能已内置到拦截器和消息监听器容器中。
为了向后兼容,默认情况下,带有 null 消息 ID 的消息被消费者视为致命(消费者停止)(一次重试后)。
要复制 |
从 1.3 版本开始,提供了一个构建器 API,以帮助使用 Java(在 @Configuration
类中)组装这些拦截器。
以下示例展示了如何执行此操作:
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
只能以这种方式配置重试功能的一个子集。
更高级的功能需要将 RetryTemplate
配置为 Spring bean。
有关可用策略及其配置的完整信息,请参阅 {spring-retry-java-docs}[Spring Retry Javadoc]。
批量监听器的重试
不建议为批量监听器配置重试,除非批量是由生产者在一个记录中创建的。 有关消费者和生产者创建的批量的更多信息,请参阅 批量消息。 对于消费者创建的批量,框架不知道批量中的哪条消息导致了故障,因此在重试耗尽后无法恢复。 对于生产者创建的批量,由于实际上只有一条消息失败,因此可以恢复整个消息。 应用程序可能希望通知自定义恢复器故障发生在批量的哪个位置,也许通过设置抛出异常的索引属性。
批量监听器的重试恢复器必须实现 MessageBatchRecoverer
。
消息监听器和异步情况
如果 MessageListener
因业务异常而失败,则异常由消息监听器容器处理,然后该容器返回监听另一条消息。
如果故障是由连接断开(而不是业务异常)引起的,则为监听器收集消息的消费者必须被取消并重新启动。
SimpleMessageListenerContainer
无缝地处理此问题,并留下日志说明监听器正在重新启动。
实际上,它会无休止地循环,尝试重新启动消费者。
只有当消费者行为非常糟糕时,它才会放弃。
一个副作用是,如果 Broker 在容器启动时宕机,它会一直尝试直到可以建立连接。
业务异常处理,与协议错误和连接断开不同,可能需要更多的思考和一些自定义配置,尤其是在使用事务或容器确认时。
在 2.8.x 之前,RabbitMQ 没有死信行为的定义。
因此,默认情况下,由于业务异常而被拒绝或回滚的消息可以无限次重新投递。
为了限制客户端的重新投递次数,一个选择是在监听器的建议链中使用 StatefulRetryOperationsInterceptor
。
拦截器可以有一个恢复回调,该回调实现自定义死信操作——任何适合您特定环境的操作。
另一个替代方法是将容器的 defaultRequeueRejected
属性设置为 false
。
这会导致所有失败的消息都被丢弃。
当使用 RabbitMQ 2.8.x 或更高版本时,这也有助于将消息投递到死信交换机。
或者,您可以抛出 AmqpRejectAndDontRequeueException
。
这样做可以防止消息重新排队,无论 defaultRequeueRejected
属性的设置如何。
从 2.1 版本开始,引入了 ImmediateRequeueAmqpException
以执行完全相反的逻辑:无论 defaultRequeueRejected
属性的设置如何,消息都将被重新排队。
通常,这两种技术结合使用。
您可以在建议链中使用 StatefulRetryOperationsInterceptor
和抛出 AmqpRejectAndDontRequeueException
的 MessageRecoverer
。
当所有重试都已耗尽时,会调用 MessageRecover
。
RejectAndDontRequeueRecoverer
正是这样做的。
默认的 MessageRecoverer
消耗错误消息并发出 WARN
消息。
从 1.3 版本开始,提供了一个新的 RepublishMessageRecoverer
,允许在重试耗尽后发布失败的消息。
当恢复器消耗最终异常时,消息被确认并且不会被 Broker 发送到死信交换机(如果已配置)。
当 |
以下示例展示了如何将 RepublishMessageRecoverer
设置为恢复器:
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
RepublishMessageRecoverer
发布消息时,在消息头中包含附加信息,例如异常消息、堆栈跟踪、原始交换机和路由键。
可以通过创建子类并覆盖 additionalHeaders()
来添加附加头。
deliveryMode
(或任何其他属性)也可以在 additionalHeaders()
中更改,如以下示例所示:
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
从 2.0.5 版本开始,如果堆栈跟踪过大,可能会被截断;这是因为所有头都必须适合单个帧。
默认情况下,如果堆栈跟踪会导致其他头可用的空间少于 20,000 字节(“余量”),它将被截断。
如果需要更多或更少的空间用于其他头,可以通过设置恢复器的 frameMaxHeadroom
属性来调整此值。
从 2.1.13、2.2.3 版本开始,异常消息包含在此计算中,并且将使用以下算法最大化堆栈跟踪量:
-
如果仅堆栈跟踪就超过限制,则异常消息头将被截断为 97 字节加上
…
,并且堆栈跟踪也被截断。 -
如果堆栈跟踪很小,消息将被截断(加上
…
)以适应可用字节(但堆栈跟踪本身内的消息将被截断为 97 字节加上…
)。
每当发生任何类型的截断时,原始异常都将被记录以保留完整信息。 在增强头之后执行评估,以便可以在表达式中使用异常类型等信息。
从 2.4.8 版本开始,错误交换机和路由键可以作为 SpEL 表达式提供,其中 Message
是评估的根对象。
从 2.3.3 版本开始,提供了一个新的子类 RepublishMessageRecovererWithConfirms
;它支持两种发布者确认样式,并在返回之前等待确认(如果未确认或消息被返回,则抛出异常)。
如果确认类型为 CORRELATED
,子类还将检测消息是否被返回并抛出 AmqpMessageReturnedException
;如果发布被否定确认,它将抛出 AmqpNackReceivedException
。
如果确认类型为 SIMPLE
,子类将调用通道上的 waitForConfirmsOrDie
方法。
有关确认和返回的更多信息,请参阅 发布者确认和返回。
从 2.1 版本开始,添加了 ImmediateRequeueMessageRecoverer
以抛出 ImmediateRequeueAmqpException
,该异常通知监听器容器重新排队当前失败的消息。
Spring Retry 的异常分类
Spring Retry 在确定哪些异常可以调用重试方面具有很大的灵活性。
默认配置会重试所有异常。
鉴于用户异常被包装在 ListenerExecutionFailedException
中,我们需要确保分类检查异常原因。
默认分类器仅查看顶层异常。
自 Spring Retry 1.0.3 起,BinaryExceptionClassifier
有一个名为 traverseCauses
的属性(默认值:false
)。
当为 true
时,它会遍历异常原因,直到找到匹配项或没有原因。
要将此分类器用于重试,您可以使用带有最大尝试次数、Exception
实例的 Map
和布尔值 (traverseCauses
) 的构造函数创建 SimpleRetryPolicy
,并将此策略注入 RetryTemplate
。
通过 Broker 重试
从队列中死信的消息可以在从 DLX 重新路由后重新发布回此队列。
此重试行为通过 x-death
头在 Broker 端控制。
有关此方法的更多信息,请参阅官方 RabbitMQ 文档。
另一种方法是从应用程序手动将失败的消息重新发布回原始交换机。
从 4.0
版本开始,RabbitMQ Broker 不考虑客户端发送的 x-death
头。
本质上,客户端的任何 x-*
头都被忽略。
为了缓解 RabbitMQ Broker 的这种新行为,Spring AMQP 从 3.2 版本开始引入了 retry_count
头。
当此头不存在且服务器端 DLX 处于活动状态时,x-death.count
属性将映射到此头。
当失败的消息被手动重新发布以进行重试时,retry_count
头值必须手动递增。
有关更多信息,请参阅 MessageProperties.incrementRetryCount()
JavaDocs。
以下示例总结了通过 Broker 手动重试的算法:
@RabbitListener(queueNames = "some_queue")
public void rePublish(Message message) {
try {
// Process message
}
catch (Exception ex) {
Long retryCount = message.getMessageProperties().getRetryCount();
if (retryCount < 3) {
message.getMessageProperties().incrementRetryCount();
this.rabbitTemplate.send("", "some_queue", message);
}
else {
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
}
}