注解支持
除了用于配置消息端点的 XML 命名空间支持之外,你还可以使用注解。
首先,Spring Integration 提供了类级别的 @MessageEndpoint
作为刻板印象注解,这意味着它本身使用 Spring 的 @Component
注解进行了注解,因此 Spring 的组件扫描会自动将其识别为 bean 定义。
更重要的是各种方法级别的注解。
它们指示被注解的方法能够处理消息。
以下示例演示了类级别和方法级别的注解:
@MessageEndpoint
public class FooService {
@ServiceActivator
public void processMessage(Message message) {
...
}
}
方法“处理”消息的具体含义取决于特定的注解。 Spring Integration 中可用的注解包括:
-
@Aggregator
(参见 聚合器) -
@Filter
(参见 过滤器) -
@Router
(参见 路由) -
@ServiceActivator
(参见 服务激活器) -
@Splitter
(参见 拆分器) -
@Transformer
(参见 转换器) -
@InboundChannelAdapter
(参见 通道适配器) -
@BridgeFrom
(参见 使用 Java 配置配置桥接) -
@BridgeTo
(参见 使用 Java 配置配置桥接) -
@MessagingGateway
(参见 消息网关) -
@IntegrationComponentScan
(参见 配置和@EnableIntegration
)
如果你将 XML 配置与注解结合使用,则不需要 |
在大多数情况下,被注解的处理方法不应要求 Message
类型作为其参数。
相反,方法参数类型可以匹配消息的负载类型,如以下示例所示:
public class ThingService {
@ServiceActivator
public void bar(Thing thing) {
...
}
}
当方法参数应该从 MessageHeaders
中的值映射时,另一种选择是使用参数级别的 @Header
注解。
通常,使用 Spring Integration 注解注解的方法可以接受 Message
本身、消息负载或头部值(使用 @Header
)作为参数。
实际上,该方法可以接受组合,如以下示例所示:
public class ThingService {
@ServiceActivator
public void otherThing(String payload, @Header("x") int valueX, @Header("y") int valueY) {
...
}
}
你还可以使用 @Headers
注解将所有消息头部作为 Map
提供,如以下示例所示:
public class ThingService {
@ServiceActivator
public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
...
}
}
注解的值也可以是 SpEL 表达式(例如, |
对于其中几个注解,当消息处理方法返回非空值时,端点会尝试发送回复。
这在两种配置选项(命名空间和注解)中是一致的,即使用此类端点的输出通道(如果可用),并且 REPLY_CHANNEL
消息头部值用作回退。
端点上的输出通道和回复通道消息头部的组合实现了管道方法,其中多个组件具有输出通道,最后一个组件允许将回复消息转发到回复通道(如原始请求消息中指定)。 换句话说,最后一个组件依赖于原始发送者提供的信息,因此可以动态支持任意数量的客户端。 这是 返回地址 模式的一个示例。 |
除了此处显示的示例之外,这些注解还支持 inputChannel
和 outputChannel
属性,如以下示例所示:
@Service
public class ThingService {
@ServiceActivator(inputChannel="input", outputChannel="output")
public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
...
}
}
这些注解的处理创建的 bean 与相应的 XML 组件相同——AbstractEndpoint
实例和 MessageHandler
实例(或入站通道适配器的 MessageSource
实例)。
参见 在 @Bean
方法上的注解。
bean 名称是根据以下模式生成的:[componentName].[methodName].[decapitalizedAnnotationClassShortName]
。
在前面的示例中,AbstractEndpoint
的 bean 名称是 thingService.otherThing.serviceActivator
,而 MessageHandler
(MessageSource
) bean 的名称是相同的名称加上额外的 .handler
(.source
) 后缀。
可以使用 @EndpointId
注解以及这些消息注解来定制此类名称。
MessageHandler
实例(MessageSource
实例)也符合由 消息历史 跟踪的条件。
从版本 4.0 开始,所有消息注解都提供了 SmartLifecycle
选项(autoStartup
和 phase
),以允许在应用程序上下文初始化时控制端点生命周期。
它们分别默认为 true
和 0
。
要更改端点的状态(例如 start()
或 stop()
),你可以通过使用 BeanFactory
(或自动装配)获取对端点 bean 的引用并调用方法。
或者,你可以向 控制总线 发送命令消息。
出于这些目的,你应该使用前面段落中提到的 beanName
。
在解析上述注解后自动创建的通道(当没有配置特定的通道 bean 时),以及相应的消费者端点,在上下文初始化接近尾声时声明为 bean。
这些 bean 可以 在其他服务中自动装配,但它们必须用 @Lazy
注解标记,因为这些定义通常在正常的自动装配处理期间尚不可用。
@Autowired
@Lazy
@Qualifier("someChannel")
MessageChannel someChannel;
...
@Bean
Thing1 dependsOnSPCA(@Qualifier("someInboundAdapter") @Lazy SourcePollingChannelAdapter someInboundAdapter) {
...
}
从版本 6.0 开始,所有消息注解现在都是 @Repeatable
,因此可以在同一个服务方法上声明多个相同类型的注解,这意味着创建与这些重复注解一样多的端点:
@Transformer(inputChannel = "inputChannel1", outputChannel = "outputChannel1")
@Transformer(inputChannel = "inputChannel2", outputChannel = "outputChannel2")
public String transform(String input) {
return input.toUpperCase();
}
使用 @Poller
注解
在 Spring Integration 4.0 之前,消息注解要求 inputChannel
是 SubscribableChannel
的引用。
对于 PollableChannel
实例,需要一个 <int:bridge/>
元素来配置一个 <int:poller/>
并使复合端点成为 PollingConsumer
。
版本 4.0 引入了 @Poller
注解,允许直接在消息注解上配置 poller
属性,如以下示例所示:
public class AnnotationService {
@Transformer(inputChannel = "input", outputChannel = "output",
poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.fixedDelay}"))
public String handle(String payload) {
...
}
}
@Poller
注解仅提供简单的 PollerMetadata
选项。
你可以使用属性占位符配置 @Poller
注解的属性(maxMessagesPerPoll
、fixedDelay
、fixedRate
和 cron
)。
此外,从版本 5.1 开始,还提供了 PollingConsumer
的 receiveTimeout
选项。
如果需要提供更多轮询选项(例如,transaction
、advice-chain
、error-handler
等),你应该将 PollerMetadata
配置为通用 bean,并将其 bean 名称用作 @Poller
的 value
属性。
在这种情况下,不允许其他属性(它们必须在 PollerMetadata
bean 上指定)。
请注意,如果 inputChannel
是 PollableChannel
并且未配置 @Poller
,则使用默认的 PollerMetadata
(如果它存在于应用程序上下文中)。
要使用 @Configuration
注解声明默认轮询器,请使用类似于以下示例的代码:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
以下示例演示了如何使用默认轮询器:
public class AnnotationService {
@Transformer(inputChannel = "aPollableChannel", outputChannel = "output")
public String handle(String payload) {
...
}
}
以下示例演示了如何使用命名轮询器:
@Bean
public PollerMetadata myPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(1000));
return pollerMetadata;
}
以下示例演示了使用默认轮询器的端点:
public class AnnotationService {
@Transformer(inputChannel = "aPollableChannel", outputChannel = "output"
poller = @Poller("myPoller"))
public String handle(String payload) {
...
}
}
从版本 4.3.3 开始,@Poller
注解具有 errorChannel
属性,以便更轻松地配置底层 MessagePublishingErrorHandler
。
此属性的作用与 <poller>
XML 组件中的 error-channel
相同。
有关更多信息,请参见 端点命名空间支持。
消息注解上的 poller()
属性与 reactive()
属性互斥。
有关更多信息,请参见下一节。
使用 @Reactive
注解
ReactiveStreamsConsumer
自版本 5.0 以来一直存在,但它仅在端点的输入通道是 FluxMessageChannel
(或任何 org.reactivestreams.Publisher
实现)时才应用。
从版本 5.3 开始,当目标消息处理器是 ReactiveMessageHandler
时,无论输入通道类型如何,框架也会创建其实例。
从版本 5.5 开始,所有消息注解都引入了 @Reactive
子注解(类似于上面提到的 @Poller
)。
它接受一个可选的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
bean 引用,并且无论输入通道类型和消息处理器如何,都将目标端点转换为 ReactiveStreamsConsumer
实例。
该函数用于 Flux.transform()
运算符,以对来自输入通道的响应式流源应用一些自定义(publishOn()
、doOnNext()
、log()
、retry()
等)。
以下示例演示了如何独立于最终订阅者和生产者将发布线程从输入通道更改为 DirectChannel
:
@Bean
public Function<Flux<?>, Flux<?>> publishOnCustomizer() {
return flux -> flux.publishOn(Schedulers.parallel());
}
@ServiceActivator(inputChannel = "directChannel", reactive = @Reactive("publishOnCustomizer"))
public void handleReactive(String payload) {
...
}
消息注解上的 reactive()
属性与 poller()
属性互斥。
有关更多信息,请参见 使用 @Poller
注解 和 响应式流支持。
使用 @InboundChannelAdapter
注解
版本 4.0 引入了 @InboundChannelAdapter
方法级注解。
它基于被注解方法的 MethodInvokingMessageSource
生成一个 SourcePollingChannelAdapter
集成组件。
此注解是 <int:inbound-channel-adapter>
XML 组件的模拟,并具有相同的限制:方法不能有参数,并且返回类型不能是 void
。
它有两个属性:value
(必需的 MessageChannel
bean 名称)和 poller
(可选的 @Poller
注解,如 前面所述)。
如果你需要提供一些 MessageHeaders
,请使用 Message<?>
返回类型并使用 MessageBuilder
构建 Message<?>
。
使用 MessageBuilder
可以配置 MessageHeaders
。
以下示例演示了如何使用 @InboundChannelAdapter
注解:
@InboundChannelAdapter("counterChannel")
public Integer count() {
return this.counter.incrementAndGet();
}
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixed-rate = "5000"))
public String foo() {
return "foo";
}
版本 4.3 引入了 channel
作为 value
注解属性的别名,以提供更好的源代码可读性。
此外,目标 MessageChannel
bean 在 SourcePollingChannelAdapter
中由提供的名称(通过 outputChannelName
选项设置)在第一次 receive()
调用时解析,而不是在初始化阶段。
它允许“延迟绑定
”逻辑:从消费者角度来看,目标 MessageChannel
bean 在 @InboundChannelAdapter
解析阶段之后稍晚创建和注册。
第一个示例要求默认轮询器已在应用程序上下文中的其他位置声明。
使用 @MessagingGateway
注解
使用 @IntegrationComponentScan
注解
标准的 Spring Framework @ComponentScan
注解不扫描接口以查找刻板印象 @Component
注解。
为了克服此限制并允许配置 @MessagingGateway
(参见 @MessagingGateway
注解),我们引入了 @IntegrationComponentScan
机制。
此注解必须与 @Configuration
注解一起放置并进行自定义以定义其扫描选项,
例如 basePackages
和 basePackageClasses
。
在这种情况下,所有发现的用 @MessagingGateway
注解的接口都将被解析并注册为 GatewayProxyFactoryBean
实例。
所有其他基于类的组件都由标准 @ComponentScan
解析。