消息通道实现
Spring Integration 提供了不同的消息通道实现。 以下章节简要描述了每种实现。
PublishSubscribeChannel
PublishSubscribeChannel
实现将其接收到的任何 Message
广播给所有订阅的处理程序。
这最常用于发送事件消息,其主要作用是通知(与文档消息相反,文档消息通常旨在由单个处理程序处理)。
请注意,PublishSubscribeChannel
仅用于发送。
由于在调用其 send(Message)
方法时,它直接广播给其订阅者,因此消费者无法轮询消息(它不实现 PollableChannel
,因此没有 receive()
方法)。
相反,任何订阅者本身都必须是 MessageHandler
,并且订阅者的 handleMessage(Message)
方法会依次被调用。
在 3.0 版本之前,对没有订阅者的 PublishSubscribeChannel
调用 send
方法会返回 false
。
与 MessagingTemplate
结合使用时,会抛出 MessageDeliveryException
。
从 3.0 版本开始,行为已更改为:如果至少存在最小数量的订阅者(并且成功处理了消息),则发送始终被视为成功。
此行为可以通过设置 minSubscribers
属性来修改,该属性默认为 0
。
如果您使用 |
QueueChannel
QueueChannel
实现封装了一个队列。
与 PublishSubscribeChannel
不同,QueueChannel
具有点对点语义。
换句话说,即使通道有多个消费者,也只有一个消费者应该接收发送到该通道的任何 Message
。
它提供了一个默认的无参构造函数(提供基本上无限制的 Integer.MAX_VALUE
容量)以及一个接受队列容量的构造函数,如下所示:
public QueueChannel(int capacity)
未达到容量限制的通道将其消息存储在其内部队列中,并且 send(Message<?>)
方法会立即返回,即使没有接收者准备好处理消息。
如果队列已达到容量,则发送方会阻塞,直到队列中有可用空间。
或者,如果您使用带有额外超时参数的发送方法,则队列会阻塞,直到有可用空间或超时时间流逝,以先发生者为准。
同样,如果队列中有消息可用,receive()
调用会立即返回;但是,如果队列为空,则 receive
调用可能会阻塞,直到有消息可用或超时时间流逝(如果提供了超时)。
在任何一种情况下,通过传递超时值 0,都可以强制立即返回,无论队列的状态如何。
但是请注意,不带 timeout
参数的 send()
和 receive()
版本会无限期阻塞。
PriorityChannel
QueueChannel
强制先进先出(FIFO)排序,而 PriorityChannel
是一种替代实现,允许根据优先级在通道内对消息进行排序。
默认情况下,优先级由每条消息中的 priority
消息头决定。
但是,对于自定义优先级确定逻辑,可以将类型为 Comparator<Message<?>>
的比较器提供给 PriorityChannel
构造函数。
RendezvousChannel
RendezvousChannel
实现了“直接切换”场景,其中发送方会阻塞,直到另一方调用通道的 receive()
方法。
另一方会阻塞,直到发送方发送消息。
在内部,此实现与 QueueChannel
非常相似,只是它使用 SynchronousQueue
(BlockingQueue
的零容量实现)。
这在发送方和接收方在不同线程中操作,但异步将消息放入队列不合适的情况下效果很好。
换句话说,使用 RendezvousChannel
,发送方知道某个接收方已接受消息,而使用 QueueChannel
,消息将被存储到内部队列中,并可能永远不会被接收。
请记住,所有这些基于队列的通道默认都只在内存中存储消息。
当需要持久性时,您可以在 |
RendezvousChannel
也可用于实现请求-回复操作。
发送方可以创建一个临时的、匿名的 RendezvousChannel
实例,然后在构建 Message
时将其设置为 replyChannel
消息头。
发送该 Message
后,发送方可以立即调用 receive
(可选地提供超时值)以阻塞等待回复 Message
。
这与许多 Spring Integration 请求-回复组件内部使用的实现非常相似。
DirectChannel
DirectChannel
具有点对点语义,但在其他方面更类似于 PublishSubscribeChannel
,而不是前面描述的任何基于队列的通道实现。
它实现了 SubscribableChannel
接口而不是 PollableChannel
接口,因此它直接将消息分派给订阅者。
然而,作为点对点通道,它与 PublishSubscribeChannel
的不同之处在于,它将每个 Message
发送给单个订阅的 MessageHandler
。
除了是最简单的点对点通道选项之外,它最重要的特性之一是它允许单个线程执行通道“两端”的操作。
例如,如果处理程序订阅了 DirectChannel
,那么向该通道发送 Message
会直接在发送方的线程中触发该处理程序的 handleMessage(Message)
方法的调用,然后 send()
方法调用才能返回。
提供具有此行为的通道实现的主要动机是支持必须跨通道的事务,同时仍然受益于通道提供的抽象和松耦合。
如果 send()
调用在事务范围内被调用,则处理程序调用的结果(例如,更新数据库记录)在确定该事务的最终结果(提交或回滚)中起作用。
由于 |
DirectChannel
内部委托给消息调度器来调用其订阅的消息处理程序,并且该调度器可以具有由 load-balancer
或 load-balancer-ref
属性(互斥)公开的负载平衡策略。
当多个消息处理程序订阅同一个通道时,消息调度器使用负载平衡策略来帮助确定消息如何在消息处理程序之间分发。
为了方便起见,load-balancer
属性公开了一个枚举值,指向 LoadBalancingStrategy
的预先存在的实现。
round-robin
(按轮换方式在处理程序之间进行负载平衡)和 none
(用于明确禁用负载平衡的情况)是唯一可用的值。
其他策略实现可能会在未来版本中添加。
但是,从 3.0 版本开始,您可以提供自己的 LoadBalancingStrategy
实现,并使用 load-balancer-ref
属性将其注入,该属性应指向实现 LoadBalancingStrategy
的 bean,如下例所示:
FixedSubscriberChannel
是一个 SubscribableChannel
,它只支持单个 MessageHandler
订阅者,并且不能取消订阅。
这对于没有其他订阅者且不需要通道拦截器的高吞吐量性能用例非常有用。
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
请注意,load-balancer
和 load-balancer-ref
属性是互斥的。
负载平衡还与布尔 failover
属性结合使用。
如果 failover
值为 true(默认值),当前面的处理程序抛出异常时,调度器会回退到任何后续处理程序(如有必要)。
顺序由处理程序本身定义的可选顺序值决定,或者,如果不存在此类值,则由处理程序订阅的顺序决定。
如果某个情况要求调度器始终尝试调用第一个处理程序,然后在每次发生错误时以相同的固定顺序序列回退,则不应提供负载平衡策略。
换句话说,即使未启用负载平衡,调度器仍然支持 failover
布尔属性。
然而,如果没有负载平衡,处理程序的调用总是从第一个开始,根据它们的顺序。
例如,当主、次、三等有明确定义时,这种方法效果很好。
当使用命名空间支持时,任何端点上的 order
属性决定了顺序。
请记住,负载平衡和 |
从 5.2 版本开始,当 failover
为 true 时,当前处理程序的失败以及失败的消息将分别在 debug
或 info
下记录(如果已配置)。
ExecutorChannel
ExecutorChannel
是一个点对点通道,支持与 DirectChannel
相同的调度器配置(负载平衡策略和 failover
布尔属性)。
这两种调度通道类型之间的主要区别在于 ExecutorChannel
委托给 TaskExecutor
实例来执行调度。
这意味着 send
方法通常不会阻塞,但这也意味着处理程序调用可能不会发生在发送方的线程中。
因此,它不支持跨越发送方和接收处理程序的事务。
发送方有时会阻塞。
例如,当使用具有客户端节流拒绝策略(例如 ThreadPoolExecutor.CallerRunsPolicy
)的 TaskExecutor
时,当线程池达到最大容量且执行器的工作队列已满时,发送方的线程可以随时执行该方法。
由于这种情况只会以不可预测的方式发生,因此您不应依赖它进行事务处理。
PartitionedChannel
从 6.1 版本开始,提供了 PartitionedChannel
实现。
这是 AbstractExecutorChannel
的扩展,表示点对点调度逻辑,其中实际消费在特定线程上处理,该线程由从发送到此通道的消息评估的 partition key 确定。
此通道类似于前面提到的 ExecutorChannel
,但不同之处在于,具有相同 partition key 的消息始终在同一个线程中处理,从而保留了顺序。
它不需要外部 TaskExecutor
,但可以使用自定义 ThreadFactory
进行配置(例如 Thread.ofVirtual().name("partition-", 0).factory()
)。
此工厂用于将单线程执行器填充到每个分区的 MessageDispatcher
委托中。
默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头用作 partition key。
此通道可以配置为简单的 bean:
@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
该通道将有 3
个分区 - 专用线程;将使用 partitionKey
消息头来确定消息将在哪个分区中处理。
有关更多信息,请参阅 PartitionedChannel
类的 Javadoc。
FluxMessageChannel
FluxMessageChannel
是 org.reactivestreams.Publisher
的一个实现,用于将发送的消息“下沉”到内部 reactor.core.publisher.Flux
中,以供下游的响应式订阅者按需消费。
此通道实现既不是 SubscribableChannel
,也不是 PollableChannel
,因此只有 org.reactivestreams.Subscriber
实例才能用于从该通道消费,并遵循响应式流的背压特性。
另一方面,FluxMessageChannel
实现了 ReactiveStreamsSubscribableChannel
,其 subscribeTo(Publisher<Message<?>>)
契约允许从响应式源发布者接收事件,将响应式流桥接到集成流中。
为了实现整个集成流的完全响应式行为,必须在流中的所有端点之间放置此类通道。
有关与响应式流交互的更多信息,请参阅 Reactive Streams Support。
作用域通道
Spring Integration 1.0 提供了 ThreadLocalChannel
实现,但在 2.0 版本中已删除。
现在处理相同需求的更通用的方法是向通道添加 scope
属性。
该属性的值可以是上下文中可用的作用域的名称。
例如,在 Web 环境中,某些作用域是可用的,并且任何自定义作用域实现都可以注册到上下文中。
以下示例展示了将线程局部作用域应用于通道,包括作用域本身的注册:
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
上例中定义的通道也内部委托给一个队列,但该通道绑定到当前线程,因此队列的内容也同样绑定。
这样,发送到通道的线程稍后可以接收相同的消息,但其他线程无法访问它们。
虽然线程作用域通道很少需要,但它们在 DirectChannel
实例用于强制单线程操作但任何回复消息都应发送到“终端”通道的情况下可能很有用。
如果该终端通道是线程作用域的,则原始发送线程可以从终端通道收集其回复。
现在,由于任何通道都可以具有作用域,除了线程局部作用域之外,您还可以定义自己的作用域。