配置消息通道
要创建消息通道实例,你可以使用 XML 的 <channel/>
元素或 Java 配置的 DirectChannel
实例,如下所示:
-
Java
-
XML
@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
<int:channel id="exampleChannel"/>
当你使用不带任何子元素的 <channel/>
元素时,它会创建一个 DirectChannel
实例(一个 SubscribableChannel
)。
要创建发布-订阅通道,请使用 <publish-subscribe-channel/>
元素(Java 中的 PublishSubscribeChannel
),如下所示:
-
Java
-
XML
@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>
你还可以提供各种 <queue/>
子元素来创建任何可轮询通道类型(如 消息通道实现 中所述)。
以下各节显示了每种通道类型的示例。
DirectChannel
配置
如前所述,DirectChannel
是默认类型。
以下清单显示了如何定义一个:
-
Java
-
XML
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
<int:channel id="directChannel"/>
默认通道具有循环负载均衡器,并且也启用了故障转移(有关详细信息,请参阅 DirectChannel
)。
要禁用其中一个或两个功能,请添加一个 <dispatcher/>
子元素(DirectChannel
的 LoadBalancingStrategy
构造函数)并按如下方式配置属性:
-
Java
-
XML
@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}
@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
<int:channel id="failFastChannel">
<int:dispatcher failover="false"/>
</int:channel>
<int:channel id="channelWithFixedOrderSequenceFailover">
<int:dispatcher load-balancer="none"/>
</int:channel>
从 6.3 版本开始,所有基于 UnicastingDispatcher
的 MessageChannel
实现都可以配置 Predicate<Exception> failoverStrategy
,而不是简单的 failover
选项。
这个谓词根据当前 MessageHandler
抛出的异常来决定是否故障转移到下一个 MessageHandler
。
更复杂的错误分析应该使用 ErrorMessageExceptionTypeRouter
来完成。
数据类型通道配置
有时,消费者只能处理特定类型的有效载荷,这迫使你必须确保输入消息的有效载荷类型。 首先想到的可能是使用消息过滤器。 然而,消息过滤器所能做的只是过滤掉不符合消费者要求的消息。 另一种方法是使用基于内容的路由器,并将具有不符合数据类型的消息路由到特定的转换器,以强制转换和转换为所需的数据类型。 这会奏效,但实现相同目的的一种更简单的方法是应用 数据类型通道 模式。 你可以为每种特定的有效载荷数据类型使用单独的数据类型通道。
要创建只接受包含特定有效载荷类型的消息的数据类型通道,请在通道元素的 datatype
属性中提供数据类型的完全限定类名,如以下示例所示:
-
Java
-
XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>
请注意,类型检查适用于任何可赋值给通道数据类型的类型。
换句话说,前面示例中的 numberChannel
将接受其有效载荷为 java.lang.Integer
或 java.lang.Double
的消息。
可以提供多个类型作为逗号分隔的列表,如以下示例所示:
-
Java
-
XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
因此,前面示例中的“numberChannel”只接受数据类型为 java.lang.Number
的消息。
但是,如果消息的有效载荷不是所需类型会发生什么?
这取决于你是否定义了一个名为 integrationConversionService
的 bean,它是一个 Spring 的 转换服务 实例。
如果不是,则会立即抛出 Exception
。
但是,如果你定义了一个 integrationConversionService
bean,它将尝试将消息的有效载荷转换为可接受的类型。
你甚至可以注册自定义转换器。
例如,假设你向我们上面配置的“numberChannel”发送一个带有 String
有效载荷的消息。
你可能会按如下方式处理该消息:
MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
通常,这将是一个完全合法的操作。 然而,由于我们使用了数据类型通道,这种操作的结果将生成一个类似于以下的异常:
Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
...
发生异常是因为我们要求有效载荷类型为 Number
,但我们发送了 String
。
所以我们需要一些东西来将 String
转换为 Number
。
为此,我们可以实现一个类似于以下示例的转换器:
public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
然后我们可以将其注册为集成转换服务的转换器,如以下示例所示:
-
Java
-
XML
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>
<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>
或者在 StringToIntegerConverter
类上,当它被 @Component
注解标记以进行自动扫描时。
解析“converter”元素时,如果尚未定义 integrationConversionService
bean,则会创建该 bean。
有了该转换器,send
操作现在将成功,因为数据类型通道使用该转换器将 String
有效载荷转换为 Integer
。
有关有效载荷类型转换的更多信息,请参阅 有效载荷类型转换。
从 4.0 版本开始,integrationConversionService
由 DefaultDatatypeChannelMessageConverter
调用,它在应用程序上下文中查找转换服务。
要使用不同的转换技术,你可以在通道上指定 message-converter
属性。
这必须是对 MessageConverter
实现的引用。
只使用 fromMessage
方法。
它为转换器提供了对消息头的访问(以防转换可能需要来自头的信息,例如 content-type
)。
该方法可以只返回转换后的有效载荷或完整的 Message
对象。
如果是后者,转换器必须小心地从入站消息中复制所有头。
或者,你可以声明一个 ID 为 datatypeChannelMessageConverter
的 MessageConverter
类型的 <bean/>
,并且该转换器将由所有具有 datatype
的通道使用。
QueueChannel
配置
要创建 QueueChannel
,请使用 <queue/>
子元素。
你可以按如下方式指定通道的容量:
-
Java
-
XML
@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
<int:channel id="queueChannel">
<queue capacity="25"/>
</int:channel>
如果未在此 |
持久化 QueueChannel
配置
由于 QueueChannel
提供了缓冲消息的能力,但默认情况下只在内存中进行,因此它也引入了在系统故障时消息可能丢失的可能性。
为了降低这种风险,QueueChannel
可以由 MessageGroupStore
策略接口的持久化实现支持。
有关 MessageGroupStore
和 MessageStore
的更多详细信息,请参阅 消息存储。
当使用 message-store
属性时,不允许使用 capacity
属性。
当 QueueChannel
收到 Message
时,它会将消息添加到消息存储中。
当从 QueueChannel
轮询 Message
时,它会从消息存储中移除。
默认情况下,QueueChannel
将其消息存储在内存队列中,这可能导致前面提到的消息丢失场景。
然而,Spring Integration 提供了持久化存储,例如 JdbcChannelMessageStore
。
你可以通过添加 message-store
属性为任何 QueueChannel
配置消息存储,如以下示例所示:
<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
(有关 Java/Kotlin 配置选项,请参阅下面的示例。)
Spring Integration JDBC 模块还为许多流行的数据库提供了模式数据定义语言 (DDL)。
这些模式位于该模块 (spring-integration-jdbc
) 的 org.springframework.integration.jdbc.store.channel 包中。
一个重要的特性是,对于任何事务性持久化存储(例如 JdbcChannelMessageStore
),只要轮询器配置了事务,从存储中移除的消息只有在事务成功完成时才能永久移除。
否则,事务将回滚,并且 Message
不会丢失。
随着越来越多的与“NoSQL”数据存储相关的 Spring 项目为这些存储提供底层支持,消息存储的许多其他实现也可用。
如果你找不到满足你特定需求的实现,你也可以提供自己的 MessageGroupStore
接口实现。
从 4.0 版本开始,我们建议尽可能将 QueueChannel
实例配置为使用 ChannelMessageStore
。
与通用消息存储相比,这些通常针对此用途进行了优化。
如果 ChannelMessageStore
是 ChannelPriorityMessageStore
,则消息按优先级顺序以 FIFO 接收。
优先级的概念由消息存储实现确定。
例如,以下示例显示了 MongoDB 通道消息存储 的 Java 配置:
-
Java
-
Java DSL
-
Kotlin DSL
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
return IntegrationFlow.from((Channels c) ->
c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
....
.get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
integrationFlow {
channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
}
请注意 |
自定义 QueueChannel
环境的另一个选项由 <int:queue>
子元素的 ref
属性或其特定构造函数提供。
此属性提供对任何 java.util.Queue
实现的引用。
例如,Hazelcast 分布式 IQueue
可以按如下方式配置:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}
@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel
配置
要创建 PublishSubscribeChannel
,请使用 <publish-subscribe-channel/>
元素。
使用此元素时,你还可以指定用于发布消息的 task-executor
(如果未指定,则在发送者的线程中发布),如下所示:
-
Java
-
XML
@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
如果你在 PublishSubscribeChannel
的下游提供了一个重新排序器或聚合器,你可以将通道上的“apply-sequence”属性设置为 true
。
这样做表明通道应在传递消息之前设置 sequence-size
和 sequence-number
消息头以及关联 ID。
例如,如果有五个订阅者,sequence-size
将设置为 5
,并且消息将具有 sequence-number
头值范围从 1
到 5
。
除了 Executor
之外,你还可以配置 ErrorHandler
。
默认情况下,PublishSubscribeChannel
使用 MessagePublishingErrorHandler
实现将错误从 errorChannel
头发送到 MessageChannel
,或发送到全局 errorChannel
实例。
如果未配置 Executor
,则 ErrorHandler
将被忽略,并且异常将直接抛给调用者的线程。
如果你在 PublishSubscribeChannel
的下游提供了一个 Resequencer
或 Aggregator
,你可以将通道上的“apply-sequence”属性设置为 true
。
这样做表明通道应在传递消息之前设置 sequence-size
和 sequence-number
消息头以及关联 ID。
例如,如果有五个订阅者,sequence-size
将设置为 5
,并且消息将具有 sequence-number
头值范围从 1
到 5
。
以下示例显示了如何将 apply-sequence
头设置为 true
:
-
Java
-
XML
@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(true);
return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
|
从 5.4.3 版本开始,PublishSubscribeChannel
还可以配置其 BroadcastingDispatcher
的 requireSubscribers
选项,以指示当没有订阅者时,此通道不会默默地忽略消息。
当没有订阅者且此选项设置为 true
时,会抛出带有 Dispatcher has no subscribers
消息的 MessageDispatchingException
。
ExecutorChannel
要创建 ExecutorChannel
,请添加带有 task-executor
属性的 <dispatcher>
子元素。
该属性的值可以引用上下文中的任何 TaskExecutor
。
例如,这样做可以配置一个线程池,用于将消息分派给订阅的处理程序。
如前所述,这样做会打破发送方和接收方之间的单线程执行上下文,从而使任何活动的事务上下文不会被处理程序的调用共享(也就是说,处理程序可能会抛出 Exception
,但 send
调用已经成功返回)。
以下示例显示了如何使用 dispatcher
元素并指定 task-executor
属性中的执行器:
-
Java
-
XML
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
|
PriorityChannel
配置
要创建 PriorityChannel
,请使用 <priority-queue/>
子元素,如以下示例所示:
-
Java
-
XML
@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
<int:priority-queue capacity="20"/>
</int:channel>
默认情况下,通道会查看消息的 priority
标头。
但是,你可以改为提供自定义的 Comparator
引用。
此外,请注意 PriorityChannel
(与其他类型一样)确实支持 datatype
属性。
与 QueueChannel
一样,它也支持 capacity
属性。
以下示例演示了所有这些:
-
Java
-
XML
@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
<int:priority-queue comparator="widgetComparator"
capacity="10"/>
</int:channel>
从 4.0 版本开始,priority-channel
子元素支持 message-store
选项(在这种情况下不允许使用 comparator
和 capacity
)。
消息存储必须是 PriorityCapableChannelMessageStore
。
目前,Redis
、JDBC
和 MongoDB
提供了 PriorityCapableChannelMessageStore
的实现。
有关更多信息,请参阅 QueueChannel
配置 和 消息存储。
你可以在 支持消息通道 中找到示例配置。
RendezvousChannel
配置
当队列子元素是 <rendezvous-queue>
时,会创建一个 RendezvousChannel
。
它不提供任何前面描述的额外配置选项,并且其队列不接受任何容量值,因为它是一个零容量的直接传递队列。
以下示例显示了如何声明 RendezvousChannel
:
-
Java
-
XML
@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
<int:rendezvous-queue/>
</int:channel>
通道拦截器配置
消息通道也可以有拦截器,如 通道拦截器 中所述。
<interceptors/>
子元素可以添加到 <channel/>
(或更具体的元素类型)。
你可以提供 ref
属性来引用任何实现 ChannelInterceptor
接口的 Spring 管理对象,如以下示例所示:
<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
通常,我们建议在单独的位置定义拦截器实现,因为它们通常提供可以在多个通道之间重用的通用行为。
全局通道拦截器配置
通道拦截器提供了一种干净简洁的方式,可以为每个单独的通道应用横切行为。 如果相同的行为应该应用于多个通道,那么为每个通道配置相同的拦截器集将不是最有效的方式。 为了避免重复配置,同时使拦截器能够应用于多个通道,Spring Integration 提供了全局拦截器。 考虑以下两对示例:
<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>
<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
每个 <channel-interceptor/>
元素都允许你定义一个全局拦截器,它将应用于所有匹配 pattern
属性定义的任何模式的通道。
在前面的示例中,全局拦截器应用于“thing1”通道以及所有以“thing2”或“input”开头但 不 以“thing3”开头的其他通道(从 5.0 版本开始)。
此语法添加到模式中可能会导致一个可能的问题(尽管可能不太可能)。
如果你有一个名为 !thing1
的 bean,并且你在通道拦截器的 pattern
模式中包含了一个 !thing1
的模式,它将不再匹配。
该模式现在匹配所有不名为 thing1
的 bean。
在这种情况下,你可以用 \
转义模式中的 !
。
模式 \!thing1
匹配名为 !thing1
的 bean。
order
属性允许你管理当给定通道上有多个拦截器时,此拦截器注入的位置。
例如,通道“inputChannel”可以配置本地的单个拦截器(见下文),如以下示例所示:
<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
一个合理的问题是“全局拦截器是如何相对于本地配置的其他拦截器或通过其他全局拦截器定义注入的?”
当前的实现提供了一种简单的机制来定义拦截器执行的顺序。
order
属性中的正数确保在任何现有拦截器之后注入拦截器,而负数确保在现有拦截器之前注入拦截器。
这意味着,在前面的示例中,全局拦截器是在本地配置的“wire-tap”拦截器之后注入的(因为它的 order
大于 0
)。
如果存在另一个具有匹配 pattern
的全局拦截器,则其顺序将通过比较两个拦截器的 order
属性值来确定。
要在现有拦截器之前注入全局拦截器,请使用 order
属性的负值。
请注意, |
消息窃听器 (Wire Tap)
如前所述,Spring Integration 提供了一个简单的消息窃听器拦截器。
你可以在 <interceptors/>
元素中的任何通道上配置消息窃听器。
这样做对于调试特别有用,并且可以与 Spring Integration 的日志通道适配器结合使用,如下所示:
<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG"/>
“logging-channel-adapter”也接受一个“expression”属性,这样你就可以对“payload”和“headers”变量评估一个 SpEL 表达式。
或者,要记录完整的消息 |
关于消息窃听器和其他类似组件(消息发布配置)的一个常见误解是,它们本质上是自动异步的。 默认情况下,消息窃听器作为一个组件不会异步调用。 相反,Spring Integration 专注于一种统一的方法来配置异步行为:消息通道。 使消息流的某些部分同步或异步的是在该流中配置的消息通道的类型。 这是消息通道抽象的主要好处之一。 从框架诞生之日起,我们就一直强调消息通道作为框架的一等公民的必要性和价值。 它不仅仅是 EIP 模式的内部隐式实现。 它完全作为可配置组件暴露给最终用户。 因此,消息窃听器组件仅负责执行以下任务:
-
通过窃听通道(例如
channelA
)拦截消息流 -
获取每条消息
-
将消息发送到另一个通道(例如
channelB
)
它本质上是桥接模式的一种变体,但它封装在通道定义中(因此更容易启用和禁用而不会中断流)。 此外,与桥接不同,它基本上分叉了另一个消息流。 该流是同步还是异步?答案取决于“channelB”的消息通道类型。 我们有以下选项:直接通道、可轮询通道和执行器通道。 后两者打破了线程边界,使得通过这些通道的通信是异步的,因为从该通道到其订阅处理程序的消息分派发生在与用于将消息发送到该通道的线程不同的线程上。 这就是使你的消息窃听流同步或异步的原因。 它与框架中的其他组件(例如消息发布者)保持一致,并通过让你无需提前担心(除了编写线程安全代码)特定代码段是应该实现为同步还是异步,从而增加了一致性和简单性。 通过消息通道连接两段代码(例如组件 A 和组件 B)的实际连接使它们的协作同步或异步。 你甚至可能希望将来从同步更改为异步,消息通道允许你快速完成,而无需触及代码。
关于消息窃听器的最后一点是,尽管上面提供了默认不异步的理由,但你应该记住,通常希望尽快将消息传递出去。 因此,将异步通道选项用作消息窃听器的出站通道是很常见的。 但是,异步行为默认情况下不强制执行。 如果我们这样做,会有许多用例会中断,包括你可能不希望打破事务边界。 也许你将消息窃听模式用于审计目的,并且你确实希望审计消息在原始事务中发送。 例如,你可以将消息窃听器连接到 JMS 出站通道适配器。 这样,你就可以两全其美:1) JMS 消息的发送可以在事务中发生,而 2) 它仍然是“即发即忘”操作,从而防止主消息流中出现任何明显的延迟。
从 4.0 版本开始,当拦截器(例如 WireTap
类)引用通道时,避免循环引用非常重要。
你需要将此类通道从当前拦截器拦截的通道中排除。
这可以通过适当的模式或编程方式完成。
如果你有一个自定义的 ChannelInterceptor
引用了一个 channel
,请考虑实现 VetoCapableInterceptor
。
这样,框架会根据提供的模式询问拦截器是否可以拦截每个候选通道。
你还可以在拦截器方法中添加运行时保护,以确保通道不是拦截器引用的通道。
WireTap
使用了这两种技术。
从 4.3 版本开始,WireTap
提供了额外的构造函数,它们接受 channelName
而不是 MessageChannel
实例。
这对于 Java 配置和使用通道自动创建逻辑时非常方便。
目标 MessageChannel
bean 在稍后,在第一次与拦截器交互时,从提供的 channelName
中解析。
通道解析需要 BeanFactory
,因此消息窃听器实例必须是 Spring 管理的 bean。
这种延迟绑定方法还简化了 Java DSL 配置中典型的消息窃听模式,如以下示例所示:
@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
条件消息窃听器
消息窃听器可以通过使用 selector
或 selector-expression
属性进行条件化。
selector
引用一个 MessageSelector
bean,它可以在运行时确定消息是否应该发送到窃听通道。
类似地,selector-expression
是一个布尔 SpEL 表达式,它具有相同的目的:如果表达式评估为 true
,则消息发送到窃听通道。
全局消息窃听器配置
可以将全局消息窃听器配置为 全局通道拦截器配置 的一个特殊情况。
为此,请配置一个顶层 wire-tap
元素。
现在,除了正常的 wire-tap
命名空间支持之外,还支持 pattern
和 order
属性,并且它们的工作方式与 channel-interceptor
完全相同。
以下示例显示了如何配置全局消息窃听器:
-
Java
-
XML
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全局消息窃听器提供了一种方便的方式,可以在不修改现有通道配置的情况下,外部配置单通道消息窃听器。
为此,将 |