注解支持

除了用于配置消息端点的 XML 命名空间支持之外,你还可以使用注解。 首先,Spring Integration 提供了类级别的 @MessageEndpoint 作为刻板印象注解,这意味着它本身使用 Spring 的 @Component 注解进行了注解,因此 Spring 的组件扫描会自动将其识别为 bean 定义。 更重要的是各种方法级别的注解。 它们指示被注解的方法能够处理消息。 以下示例演示了类级别和方法级别的注解:

@MessageEndpoint
public class FooService {

    @ServiceActivator
    public void processMessage(Message message) {
        ...
    }
}

方法“处理”消息的具体含义取决于特定的注解。 Spring Integration 中可用的注解包括:

如果你将 XML 配置与注解结合使用,则不需要 @MessageEndpoint 注解。 如果你想从 <service-activator/> 元素的 ref 属性配置 POJO 引用,你可以只提供方法级别的注解。 在这种情况下,即使 <service-activator/> 元素上没有方法级别属性,注解也能防止歧义。

在大多数情况下,被注解的处理方法不应要求 Message 类型作为其参数。 相反,方法参数类型可以匹配消息的负载类型,如以下示例所示:

public class ThingService {

    @ServiceActivator
    public void bar(Thing thing) {
        ...
    }

}

当方法参数应该从 MessageHeaders 中的值映射时,另一种选择是使用参数级别的 @Header 注解。 通常,使用 Spring Integration 注解注解的方法可以接受 Message 本身、消息负载或头部值(使用 @Header)作为参数。 实际上,该方法可以接受组合,如以下示例所示:

public class ThingService {

    @ServiceActivator
    public void otherThing(String payload, @Header("x") int valueX, @Header("y") int valueY) {
        ...
    }

}

你还可以使用 @Headers 注解将所有消息头部作为 Map 提供,如以下示例所示:

public class ThingService {

    @ServiceActivator
    public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
        ...
    }

}

注解的值也可以是 SpEL 表达式(例如,someHeader.toUpperCase()),当你想在注入头部值之前对其进行操作时,这很有用。 它还提供了一个可选的 required 属性,该属性指定属性值是否必须在头部中可用。 required 属性的默认值为 true

对于其中几个注解,当消息处理方法返回非空值时,端点会尝试发送回复。 这在两种配置选项(命名空间和注解)中是一致的,即使用此类端点的输出通道(如果可用),并且 REPLY_CHANNEL 消息头部值用作回退。

端点上的输出通道和回复通道消息头部的组合实现了管道方法,其中多个组件具有输出通道,最后一个组件允许将回复消息转发到回复通道(如原始请求消息中指定)。 换句话说,最后一个组件依赖于原始发送者提供的信息,因此可以动态支持任意数量的客户端。 这是 返回地址 模式的一个示例。

除了此处显示的示例之外,这些注解还支持 inputChanneloutputChannel 属性,如以下示例所示:

@Service
public class ThingService {

    @ServiceActivator(inputChannel="input", outputChannel="output")
    public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
        ...
    }

}

这些注解的处理创建的 bean 与相应的 XML 组件相同——AbstractEndpoint 实例和 MessageHandler 实例(或入站通道适配器的 MessageSource 实例)。 参见 @Bean 方法上的注解。 bean 名称是根据以下模式生成的:[componentName].[methodName].[decapitalizedAnnotationClassShortName]。 在前面的示例中,AbstractEndpoint 的 bean 名称是 thingService.otherThing.serviceActivator,而 MessageHandler (MessageSource) bean 的名称是相同的名称加上额外的 .handler (.source) 后缀。 可以使用 @EndpointId 注解以及这些消息注解来定制此类名称。 MessageHandler 实例(MessageSource 实例)也符合由 消息历史 跟踪的条件。 从版本 4.0 开始,所有消息注解都提供了 SmartLifecycle 选项(autoStartupphase),以允许在应用程序上下文初始化时控制端点生命周期。 它们分别默认为 true0。 要更改端点的状态(例如 start()stop()),你可以通过使用 BeanFactory(或自动装配)获取对端点 bean 的引用并调用方法。 或者,你可以向 控制总线 发送命令消息。 出于这些目的,你应该使用前面段落中提到的 beanName

在解析上述注解后自动创建的通道(当没有配置特定的通道 bean 时),以及相应的消费者端点,在上下文初始化接近尾声时声明为 bean。 这些 bean 可以 在其他服务中自动装配,但它们必须用 @Lazy 注解标记,因为这些定义通常在正常的自动装配处理期间尚不可用。

@Autowired
@Lazy
@Qualifier("someChannel")
MessageChannel someChannel;
...

@Bean
Thing1 dependsOnSPCA(@Qualifier("someInboundAdapter") @Lazy SourcePollingChannelAdapter someInboundAdapter) {
    ...
}

从版本 6.0 开始,所有消息注解现在都是 @Repeatable,因此可以在同一个服务方法上声明多个相同类型的注解,这意味着创建与这些重复注解一样多的端点:

@Transformer(inputChannel = "inputChannel1", outputChannel = "outputChannel1")
@Transformer(inputChannel = "inputChannel2", outputChannel = "outputChannel2")
public String transform(String input) {
    return input.toUpperCase();
}

使用 @Poller 注解

在 Spring Integration 4.0 之前,消息注解要求 inputChannelSubscribableChannel 的引用。 对于 PollableChannel 实例,需要一个 <int:bridge/> 元素来配置一个 <int:poller/> 并使复合端点成为 PollingConsumer。 版本 4.0 引入了 @Poller 注解,允许直接在消息注解上配置 poller 属性,如以下示例所示:

public class AnnotationService {

    @Transformer(inputChannel = "input", outputChannel = "output",
        poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.fixedDelay}"))
    public String handle(String payload) {
        ...
    }
}

@Poller 注解仅提供简单的 PollerMetadata 选项。 你可以使用属性占位符配置 @Poller 注解的属性(maxMessagesPerPollfixedDelayfixedRatecron)。 此外,从版本 5.1 开始,还提供了 PollingConsumerreceiveTimeout 选项。 如果需要提供更多轮询选项(例如,transactionadvice-chainerror-handler 等),你应该将 PollerMetadata 配置为通用 bean,并将其 bean 名称用作 @Pollervalue 属性。 在这种情况下,不允许其他属性(它们必须在 PollerMetadata bean 上指定)。 请注意,如果 inputChannelPollableChannel 并且未配置 @Poller,则使用默认的 PollerMetadata(如果它存在于应用程序上下文中)。 要使用 @Configuration 注解声明默认轮询器,请使用类似于以下示例的代码:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(10));
    return pollerMetadata;
}

以下示例演示了如何使用默认轮询器:

public class AnnotationService {

    @Transformer(inputChannel = "aPollableChannel", outputChannel = "output")
    public String handle(String payload) {
        ...
    }
}

以下示例演示了如何使用命名轮询器:

@Bean
public PollerMetadata myPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(1000));
    return pollerMetadata;
}

以下示例演示了使用默认轮询器的端点:

public class AnnotationService {

    @Transformer(inputChannel = "aPollableChannel", outputChannel = "output"
                           poller = @Poller("myPoller"))
    public String handle(String payload) {
         ...
    }
}

从版本 4.3.3 开始,@Poller 注解具有 errorChannel 属性,以便更轻松地配置底层 MessagePublishingErrorHandler。 此属性的作用与 <poller> XML 组件中的 error-channel 相同。 有关更多信息,请参见 端点命名空间支持

消息注解上的 poller() 属性与 reactive() 属性互斥。 有关更多信息,请参见下一节。

使用 @Reactive 注解

ReactiveStreamsConsumer 自版本 5.0 以来一直存在,但它仅在端点的输入通道是 FluxMessageChannel(或任何 org.reactivestreams.Publisher 实现)时才应用。 从版本 5.3 开始,当目标消息处理器是 ReactiveMessageHandler 时,无论输入通道类型如何,框架也会创建其实例。 从版本 5.5 开始,所有消息注解都引入了 @Reactive 子注解(类似于上面提到的 @Poller)。 它接受一个可选的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> bean 引用,并且无论输入通道类型和消息处理器如何,都将目标端点转换为 ReactiveStreamsConsumer 实例。 该函数用于 Flux.transform() 运算符,以对来自输入通道的响应式流源应用一些自定义(publishOn()doOnNext()log()retry() 等)。

以下示例演示了如何独立于最终订阅者和生产者将发布线程从输入通道更改为 DirectChannel

@Bean
public Function<Flux<?>, Flux<?>> publishOnCustomizer() {
    return flux -> flux.publishOn(Schedulers.parallel());
}

@ServiceActivator(inputChannel = "directChannel", reactive = @Reactive("publishOnCustomizer"))
public void handleReactive(String payload) {
    ...
}

消息注解上的 reactive() 属性与 poller() 属性互斥。 有关更多信息,请参见 使用 @Poller 注解响应式流支持

使用 @InboundChannelAdapter 注解

版本 4.0 引入了 @InboundChannelAdapter 方法级注解。 它基于被注解方法的 MethodInvokingMessageSource 生成一个 SourcePollingChannelAdapter 集成组件。 此注解是 <int:inbound-channel-adapter> XML 组件的模拟,并具有相同的限制:方法不能有参数,并且返回类型不能是 void。 它有两个属性:value(必需的 MessageChannel bean 名称)和 poller(可选的 @Poller 注解,如 前面所述)。 如果你需要提供一些 MessageHeaders,请使用 Message<?> 返回类型并使用 MessageBuilder 构建 Message<?>。 使用 MessageBuilder 可以配置 MessageHeaders。 以下示例演示了如何使用 @InboundChannelAdapter 注解:

@InboundChannelAdapter("counterChannel")
public Integer count() {
    return this.counter.incrementAndGet();
}

@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixed-rate = "5000"))
public String foo() {
    return "foo";
}

版本 4.3 引入了 channel 作为 value 注解属性的别名,以提供更好的源代码可读性。 此外,目标 MessageChannel bean 在 SourcePollingChannelAdapter 中由提供的名称(通过 outputChannelName 选项设置)在第一次 receive() 调用时解析,而不是在初始化阶段。 它允许“延迟绑定”逻辑:从消费者角度来看,目标 MessageChannel bean 在 @InboundChannelAdapter 解析阶段之后稍晚创建和注册。

第一个示例要求默认轮询器已在应用程序上下文中的其他位置声明。

使用 @MessagingGateway 注解

使用 @IntegrationComponentScan 注解

标准的 Spring Framework @ComponentScan 注解不扫描接口以查找刻板印象 @Component 注解。 为了克服此限制并允许配置 @MessagingGateway(参见 @MessagingGateway 注解),我们引入了 @IntegrationComponentScan 机制。 此注解必须与 @Configuration 注解一起放置并进行自定义以定义其扫描选项, 例如 basePackagesbasePackageClasses。 在这种情况下,所有发现的用 @MessagingGateway 注解的接口都将被解析并注册为 GatewayProxyFactoryBean 实例。 所有其他基于类的组件都由标准 @ComponentScan 解析。