轮询器

本节描述了 Spring Integration 中轮询的工作原理。

轮询消费者

当消息端点(通道适配器)连接到通道并实例化时,它们会生成以下实例之一:

实际实现取决于这些端点连接的通道类型。 连接到实现 org.springframework.messaging.SubscribableChannel 接口的通道的通道适配器会生成 EventDrivenConsumer 实例。 另一方面,连接到实现 org.springframework.messaging.PollableChannel 接口(例如 QueueChannel)的通道的通道适配器会生成 PollingConsumer 实例。

轮询消费者允许 Spring Integration 组件主动轮询消息,而不是以事件驱动的方式处理消息。

它们代表了许多消息传递场景中的一个关键横切关注点。 在 Spring Integration 中,轮询消费者基于 Gregor Hohpe 和 Bobby Woolf 所著的《企业集成模式》一书中描述的同名模式。 你可以在 该书的网站上找到该模式的描述。

有关轮询消费者配置的更多信息,请参阅 消息端点

可轮询消息源

Spring Integration 提供了轮询消费者模式的第二种变体。 当使用入站通道适配器时,这些适配器通常由 SourcePollingChannelAdapter 包装。 例如,当从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器 中描述的适配器配置了一个轮询器以定期检索消息。 因此,当组件配置了轮询器时,生成的实例属于以下类型之一:

这意味着轮询器在入站和出站消息传递场景中都会使用。 以下是一些使用轮询器的用例:

  • 轮询某些外部系统,例如 FTP 服务器、数据库和 Web 服务

  • 轮询内部(可轮询)消息通道

  • 轮询内部服务(例如重复执行 Java 类上的方法)

AOP 通知类可以应用于轮询器中的 advice-chain,例如用于启动事务的事务通知。 从 4.1 版开始,提供了 PollSkipAdvice。 轮询器使用触发器来确定下次轮询的时间。 PollSkipAdvice 可用于抑制(跳过)轮询,这可能是因为存在一些下游条件会阻止消息被处理。 要使用此建议,你必须为其提供 PollSkipStrategy 的实现。 从 4.2.5 版开始,提供了 SimplePollSkipStrategy。 要使用它,你可以将实例作为 bean 添加到应用程序上下文中,将其注入到 PollSkipAdvice 中,然后将其添加到轮询器的通知链中。 要跳过轮询,请调用 skipPolls()。 要恢复轮询,请调用 reset()。 4.2 版在此领域增加了更多灵活性。 请参阅 条件轮询器

本章旨在仅对轮询消费者及其如何适应消息通道(请参阅 消息通道)和通道适配器(请参阅 通道适配器)的概念提供一个高层次的概述。 有关消息端点(尤其是轮询消费者)的更多信息,请参阅 消息端点

延迟确认可轮询消息源

从 5.0.1 版开始,某些模块提供了 MessageSource 实现,这些实现支持将确认延迟到下游流完成(或将消息传递给另一个线程)为止。 这目前仅限于 AmqpMessageSourceKafkaMessageSource

对于这些消息源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 头(请参阅 MessageHeaderAccessor API)会添加到消息中。 当与可轮询消息源一起使用时,头的 值是 AcknowledgmentCallback 的实例,如下例所示:

@FunctionalInterface
public interface AcknowledgmentCallback extends SimpleAcknowledgment {

    void acknowledge(Status status);

    @Override
    default void acknowledge() {
        acknowledge(Status.ACCEPT);
    }

    default boolean isAcknowledged() {
        return false;
    }


    default void noAutoAck() {
        throw new UnsupportedOperationException("You cannot disable auto acknowledgment with this implementation");
    }

    default boolean isAutoAck() {
        return true;
    }

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

并非所有消息源(例如 KafkaMessageSource)都支持 REJECT 状态。 它被视为与 ACCEPT 相同。

应用程序可以随时确认消息,如下例所示:

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

如果 MessageSource 连接到 SourcePollingChannelAdapter,当轮询器线程在下游流完成后返回到适配器时,适配器会检查确认是否已确认,如果尚未确认,则将其状态设置为 ACCEPT(如果流抛出异常,则设置为 REJECT)。 状态值在 AcknowledgmentCallback.Status 枚举中定义。

Spring Integration 提供了 MessageSourcePollingTemplate 来执行 MessageSource 的临时轮询。 当 MessageHandler 回调返回(或抛出异常)时,这也会负责在 AcknowledgmentCallback 上设置 ACCEPTREJECT。 以下示例展示了如何使用 MessageSourcePollingTemplate 进行轮询:

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

在这两种情况(SourcePollingChannelAdapterMessageSourcePollingTemplate)下,你都可以通过调用回调上的 noAutoAck() 来禁用自动确认/拒绝。 如果你将消息传递给另一个线程并希望稍后确认,则可能会这样做。 并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为偏移量提交必须在同一线程上执行)。

消息源的条件轮询器

本节介绍如何使用条件轮询器。

背景

轮询器上 advice-chain 中的 Advice 对象会通知整个轮询任务(包括消息检索和处理)。 这些“环绕通知”方法无法访问轮询的任何上下文 — 只能访问轮询本身。 这对于将任务事务化或由于某些外部条件而跳过轮询等要求来说是很好的,如前所述。 如果我们希望根据轮询的 receive 部分的结果采取一些行动,或者如果我们想根据条件调整轮询器,该怎么办?对于这些情况,Spring Integration 提供了“智能”轮询。

“智能”轮询

5.3 版引入了 ReceiveMessageAdvice 接口。 advice-chain 中实现此接口的任何 Advice 对象仅应用于 receive() 操作 — MessageSource.receive()PollableChannel.receive(timeout)。 因此,它们只能应用于 SourcePollingChannelAdapterPollingConsumer。 此类实现以下方法:

  • beforeReceive(Object source) 此方法在 Object.receive() 方法之前调用。 它允许你检查和重新配置源。 返回 false 会取消此轮询(类似于前面提到的 PollSkipAdvice)。

  • Message<?> afterReceive(Message<?> result, Object source) 此方法在 receive() 方法之后调用。 同样,你可以重新配置源或采取任何行动(可能取决于结果,如果源未创建消息,结果可能为 null)。 你甚至可以返回不同的消息

Example 1. 线程安全

如果 Advice 改变了源,则不应使用 TaskExecutor 配置轮询器。 如果 Advice 改变了源,此类改变不是线程安全的,并且可能会导致意外结果,尤其是对于高频轮询器。 如果你需要并发处理轮询结果,请考虑使用下游 ExecutorChannel,而不是向轮询器添加执行器。

Example 2. 通知链排序

你应该了解通知链在初始化期间是如何处理的。 不实现 ReceiveMessageAdviceAdvice 对象应用于整个轮询过程,并且在任何 ReceiveMessageAdvice 之前按顺序调用。 然后 ReceiveMessageAdvice 对象围绕源 receive() 方法按顺序调用。 例如,如果你有 Advice 对象 a, b, c, d,其中 bdReceiveMessageAdvice,则这些对象按以下顺序应用:a, c, b, d。 此外,如果源已经是 Proxy,则 ReceiveMessageAdvice 在任何现有 Advice 对象之后调用。 如果你希望更改顺序,则必须自己连接代理。

SimpleActiveIdleReceiveMessageAdvice

此通知是 ReceiveMessageAdvice 的简单实现。 当与 DynamicPeriodicTrigger 结合使用时,它会根据上一次轮询是否产生消息来调整轮询频率。 轮询器还必须引用相同的 DynamicPeriodicTrigger

Example 3. 重要:异步移交

SimpleActiveIdleReceiveMessageAdvice 根据 receive() 结果修改触发器。 这仅当在轮询器线程上调用通知时才有效。 如果轮询器有 task-executor,则无效。 要在希望在轮询结果之后使用异步操作的情况下使用此通知,请稍后执行异步移交,例如通过使用 ExecutorChannel

CompoundTriggerAdvice

此通知允许根据轮询是否返回消息来选择两个触发器之一。 考虑一个使用 CronTrigger 的轮询器。 CronTrigger 实例是不可变的,因此一旦构建就无法更改。 考虑一个用例,我们希望使用 cron 表达式每小时触发一次轮询,但如果没有收到消息,则每分钟轮询一次,当检索到消息时,恢复使用 cron 表达式。

该通知(和轮询器)为此目的使用 CompoundTrigger。 触发器的 primary 触发器可以是 CronTrigger。 当通知检测到未收到消息时,它会将辅助触发器添加到 CompoundTrigger。 当 CompoundTrigger 实例的 nextExecutionTime 方法被调用时,如果存在辅助触发器,它会委托给辅助触发器。 否则,它会委托给主触发器。

轮询器还必须引用相同的 CompoundTrigger

以下示例显示了每小时 cron 表达式的配置,并带有每分钟回退:

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
Example 4. 重要:异步移交

CompoundTriggerAdvice 根据 receive() 结果修改触发器。 这仅当在轮询器线程上调用通知时才有效。 如果轮询器有 task-executor,则无效。 要在希望在轮询结果之后使用异步操作的情况下使用此通知,请稍后执行异步移交,例如通过使用 ExecutorChannel

仅限 MessageSource 的通知

某些通知可能仅适用于 MessageSource.receive(),而对 PollableChannel 没有意义。 为此,MessageSourceMutator 接口(ReceiveMessageAdvice 的扩展)仍然存在。 有关更多信息,请参阅 入站通道适配器:轮询多个服务器和目录