消息端点
本章的第一部分涵盖了一些背景理论,并揭示了驱动 Spring Integration 各种消息传递组件的底层 API 的许多信息。
如果您想真正理解幕后发生的事情,这些信息可能会有所帮助。
但是,如果您想使用简化的基于命名空间的各种元素配置来快速启动和运行,请暂时跳到 endpoint-namespace。
如概述中所述,消息端点负责将各种消息传递组件连接到通道。
在接下来的几章中,我们将介绍许多不同的消息消费组件。
其中一些还能够发送回复消息。
发送消息非常简单。
如 消息通道 中所示,您可以将消息发送到消息通道。
但是,接收消息则要复杂一些。
主要原因是存在两种类型的消费者:轮询消费者 和 事件驱动消费者。
在这两种类型中,事件驱动消费者要简单得多。
无需管理和调度单独的轮询线程,它们本质上是带有回调方法的监听器。
当连接到 Spring Integration 的可订阅消息通道时,这种简单的选项效果很好。
但是,当连接到缓冲的、可轮询的消息通道时,某个组件必须调度和管理轮询线程。
Spring Integration 提供了两种不同的端点实现来适应这两种类型的消费者。
因此,消费者本身只需实现回调接口。
当需要轮询时,端点充当消费者实例的容器。
其好处类似于使用容器来托管消息驱动 bean,但是,由于这些消费者是运行在 ApplicationContext
中的 Spring 管理对象,它更类似于 Spring 自己的 MessageListener
容器。
消息处理器
Spring Integration 的 MessageHandler
接口由框架内的许多组件实现。
换句话说,这不是公共 API 的一部分,您通常不会直接实现 MessageHandler
。
尽管如此,它还是由消息消费者用于实际处理已消费消息的,因此了解此策略接口有助于理解消费者的整体作用。
该接口定义如下:
public interface MessageHandler {
void handleMessage(Message<?> message);
}
尽管它很简单,但此接口为后续章节中介绍的大多数组件(路由器、转换器、分发器、聚合器、服务激活器等)奠定了基础。 这些组件对它们处理的消息执行非常不同的功能,但实际接收消息的要求是相同的,并且轮询和事件驱动行为之间的选择也是相同的。 Spring Integration 提供了两种端点实现,用于托管这些基于回调的处理程序,并允许它们连接到消息通道。
事件驱动消费者
由于事件驱动消费者端点更简单,我们首先介绍它。
您可能还记得 SubscribableChannel
接口提供了一个 subscribe()
方法,并且该方法接受一个 MessageHandler
参数(如 SubscribableChannel
中所示)。
以下清单显示了 subscribe
方法的定义:
subscribableChannel.subscribe(messageHandler);
由于订阅到通道的处理程序不必主动轮询该通道,因此这是一个事件驱动消费者,Spring Integration 提供的实现接受 SubscribableChannel
和 MessageHandler
,如以下示例所示:
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
轮询消费者
Spring Integration 还提供了一个 PollingConsumer
,它可以以相同的方式实例化,只是通道必须实现 PollableChannel
,如以下示例所示:
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
轮询消费者还有许多其他配置选项。 以下示例显示了如何设置触发器:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
PeriodicTrigger
通常使用简单的间隔 (Duration
) 定义,但也支持 initialDelay
属性和布尔 fixedRate
属性(默认为 false
,即没有固定延迟)。
以下示例设置了这两个属性:
PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
上例中三个设置的结果是,触发器等待五秒钟,然后每秒触发一次。
CronTrigger
需要一个有效的 cron 表达式。
有关详细信息,请参阅 Javadoc。
以下示例设置了一个新的 CronTrigger
:
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
上例中定义的触发器的结果是,触发器在周一至周五每十秒触发一次。
轮询端点的默认触发器是具有 1 秒固定延迟周期的 |
除了触发器之外,您还可以指定另外两个与轮询相关的配置属性:maxMessagesPerPoll
和 receiveTimeout
。
以下示例显示了如何设置这两个属性:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
maxMessagesPerPoll
属性指定在给定轮询操作中接收的最大消息数。
这意味着轮询器会继续调用 receive()
而不等待,直到返回 null
或达到最大值。
例如,如果轮询器具有十秒的间隔触发器和 maxMessagesPerPoll
设置为 25
,并且它正在轮询其队列中有 100 条消息的通道,则所有 100 条消息可以在 40 秒内检索。
它获取 25 条,等待十秒,获取接下来的 25 条,依此类推。
如果 maxMessagesPerPoll
配置为负值,则在单个轮询周期内调用 MessageSource.receive()
,直到它返回 null
。
从 5.5 版本开始,0
值具有特殊含义 - 完全跳过 MessageSource.receive()
调用,这可以被视为此轮询端点暂停,直到稍后将 maxMessagesPerPoll
更改为非零值,例如通过控制总线。
receiveTimeout
属性指定当调用接收操作时如果没有可用消息,轮询器应该等待的时间量。
例如,考虑两个表面上相似但实际上截然不同的选项:第一个选项的间隔触发器为 5 秒,接收超时为 50 毫秒;而第二个选项的间隔触发器为 50 毫秒,接收超时为 5 秒。
第一个选项可能会比它在通道上接受消息的时间晚 4950 毫秒(如果该消息在其一次轮询调用返回后立即到达)。
另一方面,第二个配置永远不会错过消息超过 50 毫秒。
区别在于第二个选项需要一个线程等待。
但是,结果是它可以更快地响应到达的消息。
这种技术,称为“长轮询
”,可以用于在轮询源上模拟事件驱动行为。
轮询消费者还可以委托给 Spring TaskExecutor
,如以下示例所示:
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外,PollingConsumer
具有一个名为 adviceChain
的属性。
此属性允许您指定一个 AOP 建议 List
,用于处理其他横切关注点,包括事务。
这些建议应用于 doPoll()
方法周围。
有关更深入的信息,请参阅 endpoint-namespace 下的 AOP 建议链和事务支持部分。
另请参阅 @Poller
注解 Javadocs 和相应的 消息注解支持 部分。
Java DSL 还提供了 .poller()
端点配置选项及其相应的 Pollers
工厂。
前面的示例显示了依赖查找。
但是,请记住,这些消费者最常配置为 Spring bean 定义。
事实上,Spring Integration 还提供了一个名为 ConsumerEndpointFactoryBean
的 FactoryBean
,它根据通道类型创建适当的消费者类型。
此外,Spring Integration 具有完整的 XML 命名空间支持,以进一步隐藏这些细节。
本指南中介绍了基于命名空间的配置,每个组件类型都会在引入时进行介绍。
许多 |
端点命名空间支持
在本参考手册中,您可以找到端点元素的特定配置示例,例如路由器、转换器、服务激活器等。
其中大多数支持 input-channel
属性,许多支持 output-channel
属性。
解析后,这些端点元素会生成 PollingConsumer
或 EventDrivenConsumer
实例,具体取决于所引用 input-channel
的类型:分别是 PollableChannel
或 SubscribableChannel
。
当通道是可轮询的时,轮询行为基于端点元素的 poller
子元素及其属性。
以下列出了 poller
的所有可用配置选项:
<int:poller cron="" [id="CO1-1"]1
default="false" [id="CO1-2"]2
error-channel="" [id="CO1-3"]3
fixed-delay="" [id="CO1-4"]4
fixed-rate="" [id="CO1-5"]5
initial-delay="" [id="CO1-6"]6
id="" [id="CO1-7"]7
max-messages-per-poll="" [id="CO1-8"]8
receive-timeout="" [id="CO1-9"]9
ref="" [id="CO1-10"]10
task-executor="" [id="CO1-11"]11
time-unit="MILLISECONDS" [id="CO1-12"]12
trigger=""> [id="CO1-13"]13
<int:advice-chain /> [id="CO1-14"]14
<int:transactional /> [id="CO1-15"]15
</int:poller>
<1> 提供使用 Cron 表达式配置轮询器的能力。 底层实现使用 `org.springframework.scheduling.support.CronTrigger`。 如果设置此属性,则不得指定以下任何属性:`fixed-delay`、`trigger`、`fixed-rate` 和 `ref`。 <2> 通过将此属性设置为 `true`,您可以定义一个且仅一个全局默认轮询器。 如果在应用程序上下文中定义了多个默认轮询器,则会引发异常。 任何连接到 `PollableChannel` (`PollingConsumer`) 的端点或任何没有显式配置轮询器的 `SourcePollingChannelAdapter` 都会使用全局默认轮询器。 它默认为 `false`。 可选。 <3> 标识当此轮询器调用失败时发送错误消息的通道。 要完全抑制异常,您可以提供对 `nullChannel` 的引用。 可选。 <4> 固定延迟触发器在底层使用 `PeriodicTrigger`。 数值以 `time-unit` 为单位,或可以是持续时间格式(从 6.2 版开始),例如 `PT10S`、`P1D`。 如果设置此属性,则不得指定以下任何属性:`fixed-rate`、`trigger`、`cron` 和 `ref`。 <5> 固定速率触发器在底层使用 `PeriodicTrigger`。 数值以 `time-unit` 为单位,或可以是持续时间格式(从 6.2 版开始),例如 `PT10S`、`P1D`。 如果设置此属性,则不得指定以下任何属性:`fixed-delay`、`trigger`、`cron` 和 `ref`。 <6> 底层 `PeriodicTrigger` 的初始延迟(从 6.2 版开始)。 数值以 `time-unit` 为单位,或可以是持续时间格式,例如 `PT10S`、`P1D`。 <7> 引用轮询器底层 bean 定义的 ID,该定义类型为 `org.springframework.integration.scheduling.PollerMetadata`。 对于顶级轮询器元素,`id` 属性是必需的,除非它是默认轮询器 (`default="true"`)。 <8> 有关详细信息,请参阅 xref:channel-adapter.adoc#channel-adapter-namespace-inbound[配置入站通道适配器]。 如果未指定,默认值取决于上下文。 如果使用 `PollingConsumer`,此属性默认为 `-1`。 但是,如果使用 `SourcePollingChannelAdapter`,`max-messages-per-poll` 属性默认为 `1`。 可选。 <9> 值设置在底层类 `PollerMetadata` 上。 如果未指定,则默认为 1000(毫秒)。 可选。 <10> Bean 引用到另一个顶级轮询器。 `ref` 属性不得存在于顶级 `poller` 元素上。 但是,如果设置此属性,则不得指定以下任何属性:`fixed-rate`、`trigger`、`cron` 和 `fixed-delay`。 <11> 提供引用自定义任务执行器的能力。 有关进一步信息,请参阅 <<TaskExecutor Support,taskexecutor-support>>。 可选。 <12> 此属性指定底层 `org.springframework.scheduling.support.PeriodicTrigger` 上的 `java.util.concurrent.TimeUnit` 枚举值。 因此,此属性只能与 `fixed-delay` 或 `fixed-rate` 属性结合使用。 如果与 `cron` 或 `trigger` 引用属性结合使用,则会导致失败。 `PeriodicTrigger` 支持的最小粒度是毫秒。 因此,只有毫秒和秒可用。 如果未提供此值,则任何 `fixed-delay` 或 `fixed-rate` 值都将被解释为毫秒。 基本上,此枚举为基于秒的间隔触发器值提供了便利。 对于按小时、按天和按月设置,我们建议改用 `cron` 触发器。 <13> 引用任何实现 `org.springframework.scheduling.Trigger` 接口的 Spring 配置 bean。 但是,如果设置此属性,则不得指定以下任何属性:`fixed-delay`、`fixed-rate`、`cron` 和 `ref`。 可选。 <14> 允许指定额外的 AOP 建议来处理其他横切关注点。 有关详细信息,请参阅 xref:jms.adoc#jms-ob-transactions[事务]。 可选。 <15> 轮询器可以进行事务处理。 有关详细信息,请参阅 <<AOP Advice chains,aop-advice-chains>>。 可选。
示例
一个简单的基于间隔的轮询器,间隔为 1 秒,可以按如下方式配置:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作为使用 fixed-rate
属性的替代方案,您还可以使用 fixed-delay
属性。
对于基于 Cron 表达式的轮询器,请改用 cron
属性,如以下示例所示:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果输入通道是 PollableChannel
,则需要轮询器配置。
具体来说,如前所述,trigger
是 PollingConsumer
类的必需属性。
因此,如果您省略轮询消费者端点配置的 poller
子元素,可能会抛出异常。
如果您尝试在连接到非轮询通道的元素上配置轮询器,也可能会抛出异常。
也可以创建顶级轮询器,在这种情况下,只需要一个 ref
属性,如以下示例所示:
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
|
全局默认轮询器
为了进一步简化配置,您可以定义一个全局默认轮询器。
XML DSL 中的单个顶级轮询器组件可以将 default
属性设置为 true
。
对于 Java 配置,在这种情况下必须声明一个名为 PollerMetadata.DEFAULT_POLLER
的 PollerMetadata
bean。
在这种情况下,任何在同一 ApplicationContext
中定义且其输入通道是 PollableChannel
且没有显式配置 poller
的端点都将使用该默认轮询器。
以下示例显示了这样的轮询器和使用它的转换器:
-
Java DSL
-
Java
-
Kotlin DSL
-
XML
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
事务支持
Spring Integration 还为轮询器提供了事务支持,以便每个接收和转发操作都可以作为原子工作单元执行。
要为轮询器配置事务,请添加 <transactional/>
子元素。
以下示例显示了可用的属性:
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有关详细信息,请参阅 轮询器事务支持。
AOP 建议链
由于 Spring 事务支持依赖于带有 TransactionInterceptor
(AOP Advice) 处理由轮询器启动的消息流事务行为的代理机制,因此有时您必须提供额外的建议来处理与轮询器相关的其他横切行为。
为此,poller
定义了一个 advice-chain
元素,允许您在实现 MethodInterceptor
接口的类中添加更多建议。
以下示例显示了如何为 poller
定义 advice-chain
:
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有关如何实现 MethodInterceptor
接口的更多信息,请参阅 Spring Framework 参考指南的 AOP 部分。
建议链也可以应用于没有任何事务配置的轮询器,从而允许您增强由轮询器启动的消息流的行为。
当使用建议链时,不能指定 <transactional/>
子元素。
相反,声明一个 <tx:advice/>
bean 并将其添加到 <advice-chain/>
。
有关完整的配置详细信息,请参阅 轮询器事务支持。
TaskExecutor 支持
轮询线程可以由 Spring 的 TaskExecutor
抽象的任何实例执行。
这为端点或端点组启用了并发。
从 Spring 3.0 开始,核心 Spring Framework 具有 task
命名空间,其 <executor/>
元素支持创建简单的线程池执行器。
该元素接受常见并发设置的属性,例如池大小和队列容量。
配置线程池执行器可以显著影响端点在负载下的性能。
这些设置适用于每个端点,因为端点的性能是需要考虑的主要因素之一(另一个主要因素是端点订阅的通道上的预期流量)。
要为使用 XML 命名空间支持配置的轮询端点启用并发,请在其 <poller/>
元素上提供 task-executor
引用,然后提供以下示例中显示的一个或多个属性:
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果您不提供任务执行器,则在调用者的线程中调用消费者的处理程序。
请注意,调用者通常是默认的 TaskScheduler
(请参阅 配置任务调度器)。
您还应该记住,task-executor
属性可以通过指定 bean 名称来提供对 Spring TaskExecutor
接口的任何实现的引用。
前面显示的 executor
元素是为了方便而提供的。
如前所述,在 endpoint-pollingconsumer 中,您还可以以模拟事件驱动行为的方式配置轮询消费者。
通过长时间的接收超时和触发器中的短间隔,即使在轮询消息源上,您也可以确保对到达消息做出非常及时的反应。
请注意,这仅适用于具有带超时的阻塞等待调用的源。
例如,文件轮询器不会阻塞。
每个 receive()
调用会立即返回,其中包含新文件或不包含。
因此,即使轮询器包含一个长的 receive-timeout
,该值在这种情况下也永远不会被使用。
另一方面,当使用 Spring Integration 自己的基于队列的通道时,超时值确实有机会参与。
以下示例显示了轮询消费者如何几乎即时地接收消息:
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
使用这种方法不会产生太多开销,因为在内部,它只不过是一个计时等待线程,它不需要像(例如)一个狂暴的无限 while 循环那样多的 CPU 资源使用。
运行时更改轮询速率
当使用 fixed-delay
或 fixed-rate
属性配置轮询器时,默认实现使用 PeriodicTrigger
实例。
PeriodicTrigger
是核心 Spring Framework 的一部分。
它只接受间隔作为构造函数参数。
因此,它不能在运行时更改。
但是,您可以定义自己的 org.springframework.scheduling.Trigger
接口实现。
您甚至可以使用 PeriodicTrigger
作为起点。
然后,您可以为间隔(周期)添加一个 setter,或者您甚至可以在触发器本身中嵌入自己的节流逻辑。
period
属性用于每次调用 nextExecutionTime
来调度下一次轮询。
要在轮询器中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 bean 定义,并使用 trigger
属性将依赖项注入到轮询器配置中,该属性引用自定义触发器 bean 实例。
现在,您可以获取触发器 bean 的引用,并在轮询之间更改轮询间隔。
有关示例,请参阅 Spring Integration 示例 项目。
它包含一个名为 dynamic-poller
的示例,该示例使用自定义触发器并演示了在运行时更改轮询间隔的能力。
该示例提供了一个实现 org.springframework.scheduling.Trigger
接口的自定义触发器。
该示例的触发器基于 Spring 的 PeriodicTrigger
实现。
但是,自定义触发器的字段不是 final 的,并且属性具有显式的 getter 和 setter,允许您在运行时动态更改轮询周期。
然而,需要注意的是,由于 Trigger 方法是 |
负载类型转换
在本参考手册中,您还可以看到各种端点的特定配置和实现示例,这些端点接受消息或任何任意 Object
作为输入参数。
在 Object
的情况下,此类参数被映射到消息有效负载或有效负载或消息头的一部分(当使用 Spring 表达式语言时)。
但是,端点方法的输入参数类型有时与有效负载或其部分的类型不匹配。
在这种情况下,我们需要执行类型转换。
Spring Integration 提供了一种方便的方式,用于在其自己的名为 integrationConversionService
的转换服务 bean 中注册类型转换器(通过使用 Spring ConversionService
)。
一旦使用 Spring Integration 基础设施定义了第一个转换器,该 bean 就会自动创建。
要注册转换器,您可以实现 org.springframework.core.convert.converter.Converter
、org.springframework.core.convert.converter.GenericConverter
或 org.springframework.core.convert.converter.ConverterFactory
。
Converter
实现最简单,它将一种类型转换为另一种类型。
为了更复杂的功能,例如转换为类层次结构,您可以实现 GenericConverter
,可能还有 ConditionalConverter
。
这些允许您完全访问 from
和 to
类型描述符,从而实现复杂的转换。
例如,如果您有一个名为 Something
的抽象类作为转换的目标(参数类型、通道数据类型等),并且您有两个名为 Thing1
和 Thing
的具体实现,并且您希望根据输入类型转换为其中一个,那么 GenericConverter
将是一个很好的选择。
有关详细信息,请参阅这些接口的 Javadoc:
当您实现了转换器后,您可以使用方便的命名空间支持注册它,如以下示例所示:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,您可以使用内部 bean,如以下示例所示:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
从 Spring Integration 4.0 开始,您可以使用注解创建上述配置,如以下示例所示:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,您可以使用 @Configuration
注解,如以下示例所示:
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
配置应用程序上下文时,Spring Framework 允许您添加一个 conversionService
bean(请参阅 配置 ConversionService 章)。
此服务在需要时用于在 bean 创建和配置期间执行适当的转换。
相比之下,integrationConversionService
用于运行时转换。
这些用途截然不同。
用于连接 bean 构造函数参数和属性的转换器如果在运行时用于 Spring Integration 表达式评估(针对数据类型通道中的消息、有效负载类型转换器等)可能会产生意外结果。
但是,如果您确实想将 Spring conversionService
用作 Spring Integration integrationConversionService
,您可以在应用程序上下文中配置一个别名,如以下示例所示:
<alias name="conversionService" alias="integrationConversionService"/>
在这种情况下,conversionService
提供的转换器可用于 Spring Integration 运行时转换。
内容类型转换
从 5.0 版本开始,默认情况下,方法调用机制基于 org.springframework.messaging.handler.invocation.InvocableHandlerMethod
基础设施。
其 HandlerMethodArgumentResolver
实现(例如 PayloadArgumentResolver
和 MessageMethodArgumentResolver
)可以使用 MessageConverter
抽象将传入的 payload
转换为目标方法参数类型。
转换可以基于 contentType
消息头。
为此,Spring Integration 提供了 ConfigurableCompositeMessageConverter
,它委托给已注册的转换器列表,直到其中一个返回非空结果。
默认情况下,此转换器提供(严格按顺序):
-
MappingJackson2MessageConverter
如果 Jackson 处理器存在于 classpath 上
有关其用途和转换的适当 contentType
值的更多信息,请参阅 Javadoc(在前面的列表中链接)。
使用 ConfigurableCompositeMessageConverter
是因为它可以使用任何其他 MessageConverter
实现,包括或排除前面提到的默认转换器。
它还可以注册为应用程序上下文中的适当 bean,覆盖默认转换器,如以下示例所示:
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
这两个新的转换器在默认转换器之前注册到复合转换器中。
您也可以不使用 ConfigurableCompositeMessageConverter
,而是通过注册一个名为 integrationArgumentResolverMessageConverter
的 bean(通过设置 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
属性)来提供自己的 MessageConverter
。
当使用 SpEL 方法调用时,不支持基于 |
异步轮询
如果您希望轮询是异步的,轮询器可以选择指定一个 task-executor
属性,该属性指向任何 TaskExecutor
bean 的现有实例(Spring 3.0 通过 task
命名空间提供了方便的命名空间配置)。
但是,在配置带有 TaskExecutor
的轮询器时,您必须了解某些事情。
问题在于存在两种配置,即轮询器和 TaskExecutor
。
它们必须相互协调。
否则,您最终可能会造成人为的内存泄漏。
考虑以下配置:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
前面的配置演示了一个不协调的配置。
默认情况下,任务执行器具有无界任务队列。 即使所有线程都被阻塞,等待新消息到达或超时,轮询器也会继续调度新任务。 鉴于有 20 个线程执行具有五秒超时的任务,它们以每秒 4 个的速度执行。 但是,新任务以每秒 20 个的速度调度,因此任务执行器中的内部队列以每秒 16 个的速度增长(在进程空闲时),因此我们有内存泄漏。
处理此问题的一种方法是设置任务执行器的 queue-capacity
属性。
甚至 0 也是一个合理的值。
您还可以通过设置 Task Executor 的 rejection-policy
属性(例如,设置为 DISCARD
)来管理无法排队的消息。
换句话说,在配置 TaskExecutor
时,您必须了解某些细节。
有关该主题的更多详细信息,请参阅 Spring 参考手册中的 “任务执行和调度”。
端点内部 Bean
许多端点都是复合 bean。
这包括所有消费者和所有轮询入站通道适配器。
消费者(轮询或事件驱动)委托给 MessageHandler
。
轮询适配器通过委托给 MessageSource
获取消息。
通常,获取委托 bean 的引用很有用,可能用于在运行时更改配置或进行测试。
这些 bean 可以通过众所周知的名称从 ApplicationContext
中获取。
MessageHandler
实例在应用程序上下文中注册,其 bean ID 类似于 someConsumer.handler
(其中“consumer”是端点 id
属性的值)。
MessageSource
实例注册的 bean ID 类似于 somePolledAdapter.source
,其中“somePolledAdapter”是适配器的 ID。
前面的内容仅适用于框架组件本身。 您可以改为使用内部 bean 定义,如以下示例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
该 bean 被视为任何声明的内部 bean,并且未在应用程序上下文中注册。
如果您希望以其他方式访问此 bean,请在顶层使用 id
声明它,并改用 ref
属性。
有关详细信息,请参阅 Spring 文档。