延迟器

延迟器是一个简单的端点,它允许消息流被延迟一定的时间间隔。 当消息被延迟时,原始发送者不会阻塞。 相反,延迟的消息会通过 org.springframework.scheduling.TaskScheduler 实例进行调度,在延迟时间过后发送到输出通道。 这种方法即使对于相当长的延迟也具有可伸缩性,因为它不会导致大量阻塞的发送者线程。 相反,在典型情况下,线程池用于实际执行消息的释放。 本节包含配置延迟器的几个示例。

配置延迟器

<delayer> 元素用于延迟两个消息通道之间的消息流。 与其他端点一样,您可以提供 'input-channel' 和 'output-channel' 属性,但延迟器还具有 'default-delay' 和 'expression' 属性(以及 'expression' 元素),它们决定了每条消息应该延迟的毫秒数。 以下示例将所有消息延迟三秒:

<int:delayer id="delayer" input-channel="input"
             default-delay="3000" output-channel="output"/>

如果您需要为每条消息确定延迟,您还可以使用 'expression' 属性提供 SpEL 表达式,如下面的表达式所示:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from("input")
            .delay(d -> d
                    .messageGroupId("delayer.messageGroupId")
                    .defaultDelay(3_000L)
                    .delayExpression("headers['delay']"))
            .channel("output")
            .get();
}
@Bean
fun flow() =
    integrationFlow("input") {
        delay {
            messageGroupId("delayer.messageGroupId")
            defaultDelay(3000L)
            delayExpression("headers['delay']")
        }
        channel("output")
    }
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
    DelayHandler handler = new DelayHandler("delayer.messageGroupId");
    handler.setDefaultDelay(3_000L);
    handler.setDelayExpressionString("headers['delay']");
    handler.setOutputChannelName("output");
    return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
             default-delay="3000" expression="headers['delay']"/>

在前面的示例中,三秒延迟仅在给定入站消息的表达式评估为 null 时适用。 如果您只想对具有有效表达式评估结果的消息应用延迟,您可以使用 0 的 'default-delay'(默认值)。 对于任何延迟为 0(或更少)的消息,消息会立即在调用线程上发送。

XML 解析器使用 <beanName>.messageGroupId 作为消息组 ID。

延迟处理器支持表示毫秒间隔的表达式评估结果(任何 ObjecttoString() 方法产生一个可以解析为 Long 的值)以及表示绝对时间的 java.util.Date 实例。 在第一种情况下,毫秒是从当前时间开始计算的(例如,值 5000 将使消息从延迟器接收到消息的时间起延迟至少五秒)。 对于 Date 实例,消息不会在 Date 对象表示的时间之前释放。 非正延迟或过去日期会导致不延迟。 相反,它会直接在原始发送者的线程上发送到输出通道。 如果表达式评估结果不是 Date 且无法解析为 Long,则应用默认延迟(如果有的话——默认值为 0)。

表达式评估可能由于各种原因抛出评估异常,包括无效表达式或其他条件。 默认情况下,此类异常会被忽略(尽管以 DEBUG 级别记录),并且延迟器会回退到默认延迟(如果有的话)。 您可以通过设置 ignore-expression-failures 属性来修改此行为。 默认情况下,此属性设置为 true,延迟器行为如前所述。 但是,如果您不想忽略表达式评估异常并将其抛给延迟器的调用者,请将 ignore-expression-failures 属性设置为 false

在前面的示例中,延迟表达式指定为 headers['delay']。 这是 SpEL Indexer 语法,用于访问 Map 元素(MessageHeaders 实现了 Map)。 它调用:headers.get("delay")。 对于简单的 map 元素名称(不包含 '.'),您也可以使用 SpEL “dot accessor” 语法,其中前面显示的 header 表达式可以指定为 headers.delay。 但是,如果 header 缺失,则会获得不同的结果。 在第一种情况下,表达式评估为 null。 第二个结果类似于以下内容:

 org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
		   Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'

因此,如果 header 有可能被省略并且您希望回退到默认延迟,通常使用索引器语法而不是点属性访问器语法更高效(并且推荐),因为检测 null 比捕获异常更快。

延迟器委托给 Spring 的 TaskScheduler 抽象实例。 延迟器使用的默认调度器是 Spring Integration 在启动时提供的 ThreadPoolTaskScheduler 实例。 请参阅 配置任务调度器。 如果您想委托给不同的调度器,可以通过延迟器元素的 'scheduler' 属性提供引用,如下例所示:

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    scheduler="exampleTaskScheduler"/>

<task:scheduler id="exampleTaskScheduler" pool-size="3"/>

如果配置外部 ThreadPoolTaskScheduler,可以在此属性上设置 waitForTasksToCompleteOnShutdown = true。 它允许在应用程序关闭时成功完成已处于执行状态(释放消息)的“延迟”任务。 在 Spring Integration 2.2 之前,此属性在 <delayer> 元素上可用,因为 DelayHandler 可以在后台创建自己的调度器。 自 2.2 起,延迟器需要一个外部调度器实例,并且 waitForTasksToCompleteOnShutdown 已被删除。 您应该使用调度器自己的配置。

ThreadPoolTaskScheduler 有一个 errorHandler 属性,可以注入 org.springframework.util.ErrorHandler 的某些实现。 此处理程序允许处理来自发送延迟消息的计划任务线程的 Exception。 默认情况下,它使用 org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler,您可以在日志中看到堆栈跟踪。 您可能需要考虑使用 org.springframework.integration.channel.MessagePublishingErrorHandler,它将 ErrorMessage 发送到 error-channel,无论是来自失败消息的 header 还是发送到默认的 error-channel。 此错误处理在事务回滚(如果存在)之后执行。 请参阅 delayer-release-failures

延迟器和消息存储

DelayHandler 将延迟的消息持久化到提供的 MessageStore 中的消息组中。 ('groupId' 基于 <delayer> 元素的必需 'id' 属性。 另请参阅 DelayHandler.setMessageGroupId(String)。) 延迟的消息在 DelayHandler 将消息发送到 output-channel 之前,立即由计划任务从 MessageStore 中删除。 如果提供的 MessageStore 是持久化的(例如 JdbcMessageStore),它提供了在应用程序关闭时不会丢失消息的能力。 应用程序启动后,DelayHandlerMessageStore 中的消息组读取消息,并根据消息的原始到达时间重新调度它们(如果延迟是数字)。 对于延迟 header 是 Date 的消息,该 Date 在重新调度时使用。 如果延迟消息在 MessageStore 中保留的时间超过其“延迟”,它将在启动后立即发送。 messageGroupId 是必需的,不能依赖于可以生成的 DelayHandler bean 名称。 这样,在应用程序重新启动后,DelayHandler 可能会获得一个新的生成的 bean 名称。 因此,延迟的消息可能会因为其组不再由应用程序管理而从重新调度中丢失。

<delayer> 可以通过两个互斥的元素之一进行丰富:<transactional><advice-chain>。 这些 AOP 建议的 List 应用于代理的内部 DelayHandler.ReleaseMessageHandler,它负责在延迟后,在计划任务的 Thread 上释放消息。 例如,当消息流下游抛出异常且 ReleaseMessageHandler 的事务回滚时,可以使用它。 在这种情况下,延迟的消息保留在持久化的 MessageStore 中。 您可以在 <advice-chain> 中使用任何自定义的 org.aopalliance.aop.Advice 实现。 <transactional> 元素定义了一个简单的建议链,其中只包含事务建议。 以下示例展示了 <delayer> 中的 advice-chain

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    message-store="jdbcMessageStore">
    <int:advice-chain>
        <beans:ref bean="customAdviceBean"/>
        <tx:advice>
            <tx:attributes>
                <tx:method name="*" read-only="true"/>
            </tx:attributes>
        </tx:advice>
    </int:advice-chain>
</int:delayer>

DelayHandler 可以作为 JMX MBean 导出,具有托管操作(getDelayedMessageCountreschedulePersistedMessages),它允许在运行时重新调度延迟的持久化消息——例如,如果 TaskScheduler 之前已停止。 这些操作可以通过 Control Bus 命令调用,如下例所示:

Message<String> delayerReschedulingMessage =
    MessageBuilder.withPayload("'delayer.handler'.reschedulePersistedMessages").build();
controlBusChannel.send(delayerReschedulingMessage);

有关消息存储、JMX 和控制总线的更多信息,请参阅 系统管理

从版本 5.3.7 开始,如果消息存储到 MessageStore 时事务处于活动状态,则释放任务将在 TransactionSynchronization.afterCommit() 回调中调度。 这对于防止竞态条件是必要的,因为竞态条件下,调度释放可能会在事务提交之前运行,并且找不到消息。 在这种情况下,消息将在延迟之后或事务提交之后释放,以较晚者为准。

释放失败

从版本 5.0.8 开始,延迟器有两个新属性:

  • maxAttempts(默认 5)

  • retryDelay(默认 1 秒)

当消息被释放时,如果下游流失败,释放将在 retryDelay 之后尝试。 如果达到 maxAttempts,消息将被丢弃(除非释放是事务性的,在这种情况下,消息将保留在存储中,但不再安排释放,直到应用程序重新启动,或调用 reschedulePersistedMessages() 方法,如上所述)。

此外,您可以配置 delayedMessageErrorChannel;当释放失败时,ErrorMessage 将发送到该通道,其中异常作为有效负载,并具有 originalMessage 属性。 ErrorMessage 包含一个 header IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,其中包含当前计数。

如果错误流消耗错误消息并正常退出,则不采取进一步操作;如果释放是事务性的,则事务将提交并从存储中删除消息。 如果错误流抛出异常,释放将重试最多 maxAttempts,如上所述。