提供的通知类
除了提供应用 AOP 通知类的通用机制外,Spring Integration 还提供了以下开箱即用的通知实现:
-
RequestHandlerRetryAdvice
(在 重试通知 中描述) -
RequestHandlerCircuitBreakerAdvice
(在 熔断器通知 中描述) -
ExpressionEvaluatingRequestHandlerAdvice
(在 表达式通知 中描述) -
RateLimiterRequestHandlerAdvice
(在 限流器通知 中描述) -
CacheRequestHandlerAdvice
(在 缓存通知 中描述) -
ReactiveRequestHandlerAdvice
(在 反应式通知 中描述) -
ContextHolderRequestHandlerAdvice
(在 上下文持有者通知 中描述) -
LockRequestHandlerAdvice
(在 锁通知 中描述)
重试通知
重试通知 (o.s.i.handler.advice.RequestHandlerRetryAdvice
) 利用了 Spring Retry 项目提供的丰富重试机制。
spring-retry
的核心组件是 RetryTemplate
,它允许配置复杂的重试场景,包括 RetryPolicy
和 BackoffPolicy
策略(有多种实现)以及 RecoveryCallback
策略,用于确定重试耗尽时要采取的操作。
- 无状态重试
-
无状态重试是指重试活动完全在通知内部处理的情况。 线程暂停(如果配置如此)并重试操作。
- 有状态重试
-
有状态重试是指重试状态在通知内部管理,但会抛出异常,并且调用者重新提交请求的情况。 有状态重试的一个例子是,我们希望消息发起者(例如 JMS)负责重新提交,而不是在当前线程上执行。 有状态重试需要一些机制来检测重试的提交。
有关 spring-retry
的更多信息,请参阅 项目 Javadoc 和 Spring Batch 的参考文档,spring-retry
源于该项目。
默认的退避行为是不退避。 重试会立即尝试。 使用导致线程在尝试之间暂停的退避策略可能会导致性能问题,包括过多的内存使用和线程饥饿。 在高并发环境中,应谨慎使用退避策略。
配置重试通知
本节中的示例使用以下始终抛出异常的 <service-activator>
:
public class FailingService {
public void service(String message) {
throw new RuntimeException("error");
}
}
- 简单无状态重试
-
默认的
RetryTemplate
有一个SimpleRetryPolicy
,它会尝试三次。 没有BackOffPolicy
,因此三次尝试是背靠背进行的,尝试之间没有延迟。 没有RecoveryCallback
,因此最终的结果是在最后一次失败的重试发生后将异常抛给调用者。 在 Spring Integration 环境中,这个最终异常可以通过使用入站端点上的error-channel
来处理。 以下示例使用RetryTemplate
并显示其DEBUG
输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3
- 带恢复的简单无状态重试
-
以下示例在前面的示例中添加了一个
RecoveryCallback
,并使用ErrorMessageSendingRecoverer
将ErrorMessage
发送到一个通道:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
- 带自定义策略和恢复的无状态重试
-
为了更复杂,我们可以为通知提供一个自定义的
RetryTemplate
。 此示例继续使用SimpleRetryPolicy
,但将尝试次数增加到四次。 它还添加了一个ExponentialBackoffPolicy
,其中第一次重试等待一秒,第二次等待五秒,第三次等待 25 秒(总共四次尝试)。 以下清单显示了示例及其DEBUG
输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> <property name="retryTemplate" ref="retryTemplate" /> </bean> </int:request-handler-advice-chain> </int:service-activator> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="4" /> </bean> </property> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="5.0" /> <property name="maxInterval" value="60000" /> </bean> </property> </bean> 27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...] 27.071 DEBUG [task-scheduler-1]Retry: count=0 27.080 DEBUG [task-scheduler-1]Sleeping for 1000 28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1 28.081 DEBUG [task-scheduler-1]Retry: count=1 28.081 DEBUG [task-scheduler-1]Sleeping for 5000 33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2 33.082 DEBUG [task-scheduler-1]Retry: count=2 33.083 DEBUG [task-scheduler-1]Sleeping for 25000 58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3 58.083 DEBUG [task-scheduler-1]Retry: count=3 58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4 58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4 58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
- 无状态重试的命名空间支持
-
从 4.0 版本开始,由于重试通知的命名空间支持,前面的配置可以大大简化,如以下示例所示:
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>
在前面的示例中,通知被定义为顶级 bean,以便它可以在多个
request-handler-advice-chain
实例中使用。 您也可以直接在链中定义通知,如以下示例所示:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
<int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:retry-advice>
</int:request-handler-advice-chain>
</int:service-activator>
一个 <handler-retry-advice>
可以有一个 <fixed-back-off>
或 <exponential-back-off>
子元素,或者没有子元素。
没有子元素的 <handler-retry-advice>
不使用退避。
如果没有 recovery-channel
,则在重试耗尽时抛出异常。
命名空间只能用于无状态重试。
对于更复杂的环境(自定义策略等),请使用正常的 <bean>
定义。
- 带恢复的简单有状态重试
-
要使重试有状态,我们需要为通知提供一个
RetryStateGenerator
实现。 此类用于将消息标识为重新提交,以便RetryTemplate
可以确定此消息的当前重试状态。 框架提供了一个SpelExpressionRetryStateGenerator
,它通过使用 SpEL 表达式来确定消息标识符。 此示例再次使用默认策略(三次尝试,无退避)。 与无状态重试一样,这些策略可以自定义。 以下清单显示了示例及其DEBUG
输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="retryStateGenerator"> <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator"> <constructor-arg value="headers['jms_messageId']" /> </bean> </property> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> 24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 24.368 DEBUG [Container#0-1]Retry: count=0 24.387 DEBUG [Container#0-1]Checking for rethrow: count=1 24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1 24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 25.412 DEBUG [Container#0-1]Retry: count=1 25.413 DEBUG [Container#0-1]Checking for rethrow: count=2 25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2 25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 26.418 DEBUG [Container#0-1]Retry: count=2 26.419 DEBUG [Container#0-1]Checking for rethrow: count=3 26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3 26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3 27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]
如果您将前面的示例与无状态示例进行比较,您可以看到,对于有状态重试,每次失败时都会将异常抛给调用者。
- 重试的异常分类
-
Spring Retry 在确定哪些异常可以调用重试方面具有很大的灵活性。 默认配置对所有异常进行重试,异常分类器会查看顶级异常。 如果您将其配置为只对
MyException
重试,而您的应用程序抛出了SomeOtherException
,其原因是MyException
,则不会发生重试。从 Spring Retry 1.0.3 开始,
BinaryExceptionClassifier
有一个名为traverseCauses
的属性(默认为false
)。 当为true
时,它会遍历异常原因,直到找到匹配项或遍历完所有原因。 要将此分类器用于重试,请使用SimpleRetryPolicy
,该策略使用接受最大尝试次数、Exception
对象Map
和traverseCauses
布尔值的构造函数创建。 然后,您可以将此策略注入到RetryTemplate
中。
在这种情况下需要 traverseCauses
,因为用户异常可能被包装在 MessagingException
中。
熔断器通知
熔断器模式的总体思想是,如果服务当前不可用,则不要浪费时间(和资源)尝试使用它。
o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice
实现了此模式。
当熔断器处于关闭状态时,端点会尝试调用服务。
如果连续尝试失败达到一定次数,熔断器会进入打开状态。
当熔断器处于打开状态时,新请求会“快速失败
”,并且在一段时间过期之前不会尝试调用服务。
当该时间过期后,熔断器被设置为半开状态。 在此状态下,即使一次尝试失败,熔断器也会立即进入打开状态。 如果尝试成功,熔断器会进入关闭状态,在这种情况下,在配置的连续失败次数再次发生之前,它不会再次进入打开状态。 任何成功的尝试都会将状态重置为零失败,以确定熔断器何时可能再次进入打开状态。
通常,此通知可能用于外部服务,其中可能需要一些时间才能失败(例如尝试建立网络连接时超时)。
RequestHandlerCircuitBreakerAdvice
有两个属性:threshold
和 halfOpenAfter
。
threshold
属性表示熔断器进入打开状态所需的连续失败次数。
它默认为 5
。
halfOpenAfter
属性表示熔断器在尝试另一个请求之前等待的时间,从上次失败算起。
默认值为 1000 毫秒。
以下示例配置了一个熔断器并显示了其 DEBUG
和 ERROR
输出:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
在前面的示例中,阈值设置为 2
,halfOpenAfter
设置为 12
秒。
每 5 秒到达一个新请求。
前两次尝试调用了服务。
第三次和第四次失败,并抛出指示熔断器打开的异常。
第五次请求被尝试,因为该请求在上次失败后 15 秒。
第六次尝试立即失败,因为熔断器立即打开。
表达式评估通知
最后提供的通知类是 o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice
。
此通知比其他两个通知更通用。
它提供了一种机制,用于评估发送到端点的原始入站消息上的表达式。
成功或失败后,可以评估单独的表达式。
可选地,包含评估结果和输入消息的消息可以发送到消息通道。
此通知的典型用例可能是与 <ftp:outbound-channel-adapter/>
一起使用,也许是将文件移动到成功传输的目录,或者在失败时移动到另一个目录:
该通知具有在成功时设置表达式、在失败时设置表达式以及相应的通道的属性。
对于成功的情况,发送到 successChannel
的消息是一个 AdviceMessage
,其有效负载是表达式评估的结果。
一个额外的属性,名为 inputMessage
,包含发送到处理程序的原始消息。
发送到 failureChannel
的消息(当处理程序抛出异常时)是一个 ErrorMessage
,其有效负载为 MessageHandlingExpressionEvaluatingAdviceException
。
像所有 MessagingException
实例一样,此有效负载具有 failedMessage
和 cause
属性,以及一个名为 evaluationResult
的附加属性,其中包含表达式评估的结果。
从 5.1.3 版本开始,如果配置了通道但未提供表达式,则使用默认表达式评估消息的 |
当在通知范围内抛出异常时,默认情况下,在评估任何 failureExpression
后,该异常会抛给调用者。
如果您希望抑制抛出异常,请将 trapException
属性设置为 true
。
以下通知显示了如何使用 Java DSL 配置 advice
:
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
限流器通知
限流器通知 (RateLimiterRequestHandlerAdvice
) 允许确保端点不会因请求而过载。
当超出限流时,请求将进入阻塞状态。
此通知的典型用例可能是外部服务提供商不允许每分钟超过 n
个请求。
RateLimiterRequestHandlerAdvice
实现完全基于 Resilience4j 项目,并且需要注入 RateLimiter
或 RateLimiterConfig
。
也可以使用默认值和/或自定义名称进行配置。
以下示例配置了一个每 1 秒一个请求的限流器通知:
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
缓存通知
从 5.2 版本开始,引入了 CacheRequestHandlerAdvice
。
它基于 Spring Framework 中的缓存抽象,并与 @Caching
注解族提供的概念和功能保持一致。
内部逻辑基于 CacheAspectSupport
扩展,其中围绕 AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage
方法进行缓存操作的代理,并以请求 Message<?>
作为参数。
此通知可以使用 SpEL 表达式或 Function
配置以评估缓存键。
请求 Message<?>
可作为 SpEL 评估上下文的根对象,或作为 Function
输入参数。
默认情况下,请求消息的 payload
用于缓存键。
CacheRequestHandlerAdvice
必须配置 cacheNames
,当默认缓存操作是 CacheableOperation
时,或者配置一组任意的 CacheOperation
。
每个 CacheOperation
都可以单独配置或共享选项,例如 CacheManager
、CacheResolver
和 CacheErrorHandler
,可以从 CacheRequestHandlerAdvice
配置中重用。
此配置功能类似于 Spring Framework 的 @CacheConfig
和 @Caching
注解组合。
如果未提供 CacheManager
,则默认从 BeanFactory
中的 CacheAspectSupport
解析单个 bean。
以下示例配置了两个具有不同缓存操作集的通知:
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}