线程屏障
有时,我们需要暂停消息流线程,直到发生其他异步事件。 例如,考虑一个将消息发布到 RabbitMQ 的 HTTP 请求。 我们可能希望在 RabbitMQ 代理发出消息已收到的确认之前,不回复用户。
在 4.2 版本中,Spring Integration 为此引入了 <barrier/>
组件。
底层 MessageHandler
是 BarrierMessageHandler
。
此类的 trigger()
方法也实现了 MessageTriggerAction
,其中传递给 trigger()
方法的消息会释放 handleRequestMessage()
方法中相应的线程(如果存在)。
通过对消息调用 CorrelationStrategy
来关联被暂停的线程和触发线程。
当消息发送到 input-channel
时,线程会暂停长达 requestTimeout
毫秒,等待相应的触发消息。
默认的关联策略使用 IntegrationMessageHeaderAccessor.CORRELATION_ID
头。
当带有相同关联的触发消息到达时,线程被释放。
释放后发送到 output-channel
的消息是通过使用 MessageGroupProcessor
构建的。
默认情况下,消息是两个有效载荷的 Collection<?>
,并且头通过 DefaultAggregatingMessageGroupProcessor
合并。
如果 trigger()
方法首先被调用(或在主线程超时之后),它将暂停长达 triggerTimeout
,等待暂停消息的到来。
如果您不想暂停触发线程,请考虑将其交给 TaskExecutor
,以便暂停其线程。
在 5.4 版本之前,请求和触发消息只有一个 |
requires-reply
属性决定了如果被暂停的线程在触发消息到达之前超时,应采取的行动。
默认情况下,它为 false
,这意味着端点返回 null
,流结束,线程返回给调用者。
当为 true
时,会抛出 ReplyRequiredException
。
您可以编程方式调用 trigger()
方法(通过名称 barrier.handler
获取 bean 引用——其中 barrier
是屏障端点的 bean 名称)。
或者,您可以配置一个 <outbound-channel-adapter/>
来触发释放。
只有一个线程可以与相同的关联暂停。 相同的关联可以多次使用,但只能并发使用一次。 如果第二个线程以相同的关联到达,则会抛出异常。
以下示例展示了如何使用自定义头进行关联:
-
Java
-
XML
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}
@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
correlation-strategy-expression="headers['myHeader']"
output-processor="myOutputProcessor"
discard-channel="lateTriggerChannel"
timeout="10000">
</int:barrier>
<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
根据哪个消息先到达,发送消息到 in
的线程或发送消息到 release
的线程将等待长达十秒,直到另一个消息到达。
当消息被释放时,out
通道会发送一条消息,该消息结合了调用名为 myOutputProcessor
的自定义 MessageGroupProcessor
bean 的结果。
如果主线程超时并且触发器稍后到达,您可以配置一个丢弃通道,将延迟触发器发送到该通道。
如果请求消息未及时到达,触发消息也会被丢弃。
有关此组件的示例,请参阅 屏障示例应用程序。