轮询器
本节描述了 Spring Integration 中轮询的工作原理。
轮询消费者
当消息端点(通道适配器)连接到通道并实例化时,它们会生成以下实例之一:
实际实现取决于这些端点连接的通道类型。
连接到实现 org.springframework.messaging.SubscribableChannel
接口的通道的通道适配器会生成 EventDrivenConsumer
实例。
另一方面,连接到实现 org.springframework.messaging.PollableChannel
接口(例如 QueueChannel
)的通道的通道适配器会生成 PollingConsumer
实例。
轮询消费者允许 Spring Integration 组件主动轮询消息,而不是以事件驱动的方式处理消息。
它们代表了许多消息传递场景中的一个关键横切关注点。 在 Spring Integration 中,轮询消费者基于 Gregor Hohpe 和 Bobby Woolf 所著的《企业集成模式》一书中描述的同名模式。 你可以在 该书的网站上找到该模式的描述。
有关轮询消费者配置的更多信息,请参阅 消息端点。
可轮询消息源
Spring Integration 提供了轮询消费者模式的第二种变体。
当使用入站通道适配器时,这些适配器通常由 SourcePollingChannelAdapter
包装。
例如,当从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器 中描述的适配器配置了一个轮询器以定期检索消息。
因此,当组件配置了轮询器时,生成的实例属于以下类型之一:
这意味着轮询器在入站和出站消息传递场景中都会使用。 以下是一些使用轮询器的用例:
-
轮询某些外部系统,例如 FTP 服务器、数据库和 Web 服务
-
轮询内部(可轮询)消息通道
-
轮询内部服务(例如重复执行 Java 类上的方法)
AOP 通知类可以应用于轮询器中的 |
延迟确认可轮询消息源
从 5.0.1 版开始,某些模块提供了 MessageSource
实现,这些实现支持将确认延迟到下游流完成(或将消息传递给另一个线程)为止。
这目前仅限于 AmqpMessageSource
和 KafkaMessageSource
。
对于这些消息源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
头(请参阅 MessageHeaderAccessor
API)会添加到消息中。
当与可轮询消息源一起使用时,头的 值是 AcknowledgmentCallback
的实例,如下例所示:
@FunctionalInterface
public interface AcknowledgmentCallback extends SimpleAcknowledgment {
void acknowledge(Status status);
@Override
default void acknowledge() {
acknowledge(Status.ACCEPT);
}
default boolean isAcknowledged() {
return false;
}
default void noAutoAck() {
throw new UnsupportedOperationException("You cannot disable auto acknowledgment with this implementation");
}
default boolean isAutoAck() {
return true;
}
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
并非所有消息源(例如 KafkaMessageSource
)都支持 REJECT
状态。
它被视为与 ACCEPT
相同。
应用程序可以随时确认消息,如下例所示:
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
如果 MessageSource
连接到 SourcePollingChannelAdapter
,当轮询器线程在下游流完成后返回到适配器时,适配器会检查确认是否已确认,如果尚未确认,则将其状态设置为 ACCEPT
(如果流抛出异常,则设置为 REJECT
)。
状态值在 AcknowledgmentCallback.Status
枚举中定义。
Spring Integration 提供了 MessageSourcePollingTemplate
来执行 MessageSource
的临时轮询。
当 MessageHandler
回调返回(或抛出异常)时,这也会负责在 AcknowledgmentCallback
上设置 ACCEPT
或 REJECT
。
以下示例展示了如何使用 MessageSourcePollingTemplate
进行轮询:
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
在这两种情况(SourcePollingChannelAdapter
和 MessageSourcePollingTemplate
)下,你都可以通过调用回调上的 noAutoAck()
来禁用自动确认/拒绝。
如果你将消息传递给另一个线程并希望稍后确认,则可能会这样做。
并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为偏移量提交必须在同一线程上执行)。
消息源的条件轮询器
本节介绍如何使用条件轮询器。
背景
轮询器上 advice-chain
中的 Advice
对象会通知整个轮询任务(包括消息检索和处理)。
这些“环绕通知”方法无法访问轮询的任何上下文 — 只能访问轮询本身。
这对于将任务事务化或由于某些外部条件而跳过轮询等要求来说是很好的,如前所述。
如果我们希望根据轮询的 receive
部分的结果采取一些行动,或者如果我们想根据条件调整轮询器,该怎么办?对于这些情况,Spring Integration 提供了“智能”轮询。
“智能”轮询
5.3 版引入了 ReceiveMessageAdvice
接口。
advice-chain
中实现此接口的任何 Advice
对象仅应用于 receive()
操作 — MessageSource.receive()
和 PollableChannel.receive(timeout)
。
因此,它们只能应用于 SourcePollingChannelAdapter
或 PollingConsumer
。
此类实现以下方法:
-
beforeReceive(Object source)
此方法在Object.receive()
方法之前调用。 它允许你检查和重新配置源。 返回false
会取消此轮询(类似于前面提到的PollSkipAdvice
)。 -
Message<?> afterReceive(Message<?> result, Object source)
此方法在receive()
方法之后调用。 同样,你可以重新配置源或采取任何行动(可能取决于结果,如果源未创建消息,结果可能为null
)。 你甚至可以返回不同的消息
如果 Advice
改变了源,则不应使用 TaskExecutor
配置轮询器。
如果 Advice
改变了源,此类改变不是线程安全的,并且可能会导致意外结果,尤其是对于高频轮询器。
如果你需要并发处理轮询结果,请考虑使用下游 ExecutorChannel
,而不是向轮询器添加执行器。
你应该了解通知链在初始化期间是如何处理的。
不实现 ReceiveMessageAdvice
的 Advice
对象应用于整个轮询过程,并且在任何 ReceiveMessageAdvice
之前按顺序调用。
然后 ReceiveMessageAdvice
对象围绕源 receive()
方法按顺序调用。
例如,如果你有 Advice
对象 a, b, c, d
,其中 b
和 d
是 ReceiveMessageAdvice
,则这些对象按以下顺序应用:a, c, b, d
。
此外,如果源已经是 Proxy
,则 ReceiveMessageAdvice
在任何现有 Advice
对象之后调用。
如果你希望更改顺序,则必须自己连接代理。
SimpleActiveIdleReceiveMessageAdvice
此通知是 ReceiveMessageAdvice
的简单实现。
当与 DynamicPeriodicTrigger
结合使用时,它会根据上一次轮询是否产生消息来调整轮询频率。
轮询器还必须引用相同的 DynamicPeriodicTrigger
。
SimpleActiveIdleReceiveMessageAdvice
根据 receive()
结果修改触发器。
这仅当在轮询器线程上调用通知时才有效。
如果轮询器有 task-executor
,则无效。
要在希望在轮询结果之后使用异步操作的情况下使用此通知,请稍后执行异步移交,例如通过使用 ExecutorChannel
。
CompoundTriggerAdvice
此通知允许根据轮询是否返回消息来选择两个触发器之一。
考虑一个使用 CronTrigger
的轮询器。
CronTrigger
实例是不可变的,因此一旦构建就无法更改。
考虑一个用例,我们希望使用 cron 表达式每小时触发一次轮询,但如果没有收到消息,则每分钟轮询一次,当检索到消息时,恢复使用 cron 表达式。
该通知(和轮询器)为此目的使用 CompoundTrigger
。
触发器的 primary
触发器可以是 CronTrigger
。
当通知检测到未收到消息时,它会将辅助触发器添加到 CompoundTrigger
。
当 CompoundTrigger
实例的 nextExecutionTime
方法被调用时,如果存在辅助触发器,它会委托给辅助触发器。
否则,它会委托给主触发器。
轮询器还必须引用相同的 CompoundTrigger
。
以下示例显示了每小时 cron 表达式的配置,并带有每分钟回退:
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
CompoundTriggerAdvice
根据 receive()
结果修改触发器。
这仅当在轮询器线程上调用通知时才有效。
如果轮询器有 task-executor
,则无效。
要在希望在轮询结果之后使用异步操作的情况下使用此通知,请稍后执行异步移交,例如通过使用 ExecutorChannel
。
仅限 MessageSource 的通知
某些通知可能仅适用于 MessageSource.receive()
,而对 PollableChannel
没有意义。
为此,MessageSourceMutator
接口(ReceiveMessageAdvice
的扩展)仍然存在。
有关更多信息,请参阅 入站通道适配器:轮询多个服务器和目录。