延迟器
延迟器是一个简单的端点,它允许消息流被延迟一定的时间间隔。
当消息被延迟时,原始发送者不会阻塞。
相反,延迟的消息会通过 org.springframework.scheduling.TaskScheduler
实例进行调度,在延迟时间过后发送到输出通道。
这种方法即使对于相当长的延迟也具有可伸缩性,因为它不会导致大量阻塞的发送者线程。
相反,在典型情况下,线程池用于实际执行消息的释放。
本节包含配置延迟器的几个示例。
配置延迟器
<delayer>
元素用于延迟两个消息通道之间的消息流。
与其他端点一样,您可以提供 'input-channel' 和 'output-channel' 属性,但延迟器还具有 'default-delay' 和 'expression' 属性(以及 'expression' 元素),它们决定了每条消息应该延迟的毫秒数。
以下示例将所有消息延迟三秒:
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
如果您需要为每条消息确定延迟,您还可以使用 'expression' 属性提供 SpEL 表达式,如下面的表达式所示:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay(d -> d
.messageGroupId("delayer.messageGroupId")
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay {
messageGroupId("delayer.messageGroupId")
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
在前面的示例中,三秒延迟仅在给定入站消息的表达式评估为 null 时适用。
如果您只想对具有有效表达式评估结果的消息应用延迟,您可以使用 0
的 'default-delay'(默认值)。
对于任何延迟为 0
(或更少)的消息,消息会立即在调用线程上发送。
XML 解析器使用 |
延迟处理器支持表示毫秒间隔的表达式评估结果(任何 |
表达式评估可能由于各种原因抛出评估异常,包括无效表达式或其他条件。
默认情况下,此类异常会被忽略(尽管以 DEBUG 级别记录),并且延迟器会回退到默认延迟(如果有的话)。
您可以通过设置 ignore-expression-failures
属性来修改此行为。
默认情况下,此属性设置为 true
,延迟器行为如前所述。
但是,如果您不想忽略表达式评估异常并将其抛给延迟器的调用者,请将 ignore-expression-failures
属性设置为 false
。
在前面的示例中,延迟表达式指定为
因此,如果 header 有可能被省略并且您希望回退到默认延迟,通常使用索引器语法而不是点属性访问器语法更高效(并且推荐),因为检测 null 比捕获异常更快。 |
延迟器委托给 Spring 的 TaskScheduler
抽象实例。
延迟器使用的默认调度器是 Spring Integration 在启动时提供的 ThreadPoolTaskScheduler
实例。
请参阅 配置任务调度器。
如果您想委托给不同的调度器,可以通过延迟器元素的 'scheduler' 属性提供引用,如下例所示:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果配置外部 |
|
延迟器和消息存储
DelayHandler
将延迟的消息持久化到提供的 MessageStore
中的消息组中。
('groupId' 基于 <delayer>
元素的必需 'id' 属性。
另请参阅 DelayHandler.setMessageGroupId(String)
。)
延迟的消息在 DelayHandler
将消息发送到 output-channel
之前,立即由计划任务从 MessageStore
中删除。
如果提供的 MessageStore
是持久化的(例如 JdbcMessageStore
),它提供了在应用程序关闭时不会丢失消息的能力。
应用程序启动后,DelayHandler
从 MessageStore
中的消息组读取消息,并根据消息的原始到达时间重新调度它们(如果延迟是数字)。
对于延迟 header 是 Date
的消息,该 Date
在重新调度时使用。
如果延迟消息在 MessageStore
中保留的时间超过其“延迟”,它将在启动后立即发送。
messageGroupId
是必需的,不能依赖于可以生成的 DelayHandler
bean 名称。
这样,在应用程序重新启动后,DelayHandler
可能会获得一个新的生成的 bean 名称。
因此,延迟的消息可能会因为其组不再由应用程序管理而从重新调度中丢失。
<delayer>
可以通过两个互斥的元素之一进行丰富:<transactional>
和 <advice-chain>
。
这些 AOP 建议的 List
应用于代理的内部 DelayHandler.ReleaseMessageHandler
,它负责在延迟后,在计划任务的 Thread
上释放消息。
例如,当消息流下游抛出异常且 ReleaseMessageHandler
的事务回滚时,可以使用它。
在这种情况下,延迟的消息保留在持久化的 MessageStore
中。
您可以在 <advice-chain>
中使用任何自定义的 org.aopalliance.aop.Advice
实现。
<transactional>
元素定义了一个简单的建议链,其中只包含事务建议。
以下示例展示了 <delayer>
中的 advice-chain
:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
DelayHandler
可以作为 JMX MBean
导出,具有托管操作(getDelayedMessageCount
和 reschedulePersistedMessages
),它允许在运行时重新调度延迟的持久化消息——例如,如果 TaskScheduler
之前已停止。
这些操作可以通过 Control Bus
命令调用,如下例所示:
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("'delayer.handler'.reschedulePersistedMessages").build();
controlBusChannel.send(delayerReschedulingMessage);
有关消息存储、JMX 和控制总线的更多信息,请参阅 系统管理。 |
从版本 5.3.7 开始,如果消息存储到 MessageStore
时事务处于活动状态,则释放任务将在 TransactionSynchronization.afterCommit()
回调中调度。
这对于防止竞态条件是必要的,因为竞态条件下,调度释放可能会在事务提交之前运行,并且找不到消息。
在这种情况下,消息将在延迟之后或事务提交之后释放,以较晚者为准。
释放失败
从版本 5.0.8 开始,延迟器有两个新属性:
-
maxAttempts
(默认 5) -
retryDelay
(默认 1 秒)
当消息被释放时,如果下游流失败,释放将在 retryDelay
之后尝试。
如果达到 maxAttempts
,消息将被丢弃(除非释放是事务性的,在这种情况下,消息将保留在存储中,但不再安排释放,直到应用程序重新启动,或调用 reschedulePersistedMessages()
方法,如上所述)。
此外,您可以配置 delayedMessageErrorChannel
;当释放失败时,ErrorMessage
将发送到该通道,其中异常作为有效负载,并具有 originalMessage
属性。
ErrorMessage
包含一个 header IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT
,其中包含当前计数。
如果错误流消耗错误消息并正常退出,则不采取进一步操作;如果释放是事务性的,则事务将提交并从存储中删除消息。
如果错误流抛出异常,释放将重试最多 maxAttempts
,如上所述。