消息发布

AOP(面向切面编程)消息发布功能允许你将消息作为方法调用的副产品来构建和发送。 例如,假设你有一个组件,并且每次此组件的状态更改时,你都希望通过消息获得通知。 发送此类通知最简单的方法是将消息发送到专用通道,但是你如何将更改对象状态的方法调用连接到消息发送过程,以及通知消息应该如何构造? AOP 消息发布功能通过配置驱动的方法处理这些职责。

消息发布配置

Spring Integration 提供了两种方法:XML 配置和注解驱动(Java)配置。

使用 @Publisher 注解进行注解驱动配置

注解驱动方法允许你使用 @Publisher 注解注释任何方法,以指定“channel”属性。 从版本 5.1 开始,要启用此功能,你必须在某个 @Configuration 类上使用 @EnablePublisher 注解。 有关更多信息,请参阅 配置和 @EnableIntegration。 消息是根据方法调用的返回值构建的,并发送到“channel”属性指定的通道。 为了进一步管理消息结构,你还可以结合使用 @Payload@Header 注解。

在内部,Spring Integration 的此消息发布功能通过定义 PublisherAnnotationAdvisor 和 Spring 表达式语言 (SpEL) 使用 Spring AOP,从而为你提供了极大的灵活性和对它发布 Message 结构的控制。

PublisherAnnotationAdvisor 定义并绑定以下变量:

  • #return: 绑定到返回值,允许你引用它或它的属性(例如,#return.something,其中“something”是绑定到 #return 的对象的属性)

  • #exception: 如果方法调用抛出异常,则绑定到异常

  • #args: 绑定到方法参数,以便你可以按名称提取单个参数(例如,#args.fname

考虑以下示例:

@Publisher
public String defaultPayload(String fname, String lname) {
  return fname + " " + lname;
}

在前面的示例中,消息使用以下结构构建:

  • 消息负载是方法的返回类型和值。 这是默认值。

  • 新构建的消息发送到默认发布者通道,该通道使用注解后处理器(本节后面介绍)配置。

以下示例与前面的示例相同,只是它不使用默认发布通道:

@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
  return fname + " " + lname;
}

我们没有使用默认发布通道,而是通过设置 @Publisher 注解的“channel”属性来指定发布通道。 我们还添加了 @Header 注解,这导致名为“last”的消息头具有与“lname”方法参数相同的值。 该头被添加到新构建的消息中。

以下示例与前面的示例几乎相同:

@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
  return fname + " " + lname;
}

唯一的区别是我们对方法使用了 @Payload 注解,以明确指定方法的返回值应作为消息的负载。

以下示例通过在 @Payload 注解中使用 Spring 表达式语言进一步扩展了之前的配置,以进一步指示框架应如何构建消息:

@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
  return fname + " " + lname;
}

在前面的示例中,消息是方法调用的返回值和“lname”输入参数的串联。 名为“x”的消息头的值由“num”输入参数确定。 该头被添加到新构建的消息中。

@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
  return fname + " " + lname;
}

在前面的示例中,你看到了 @Payload 注解的另一种用法。 在这里,我们注解了一个方法参数,该参数成为新构建消息的负载。

与 Spring 中的大多数其他注解驱动功能一样,你需要注册一个后处理器 (PublisherAnnotationBeanPostProcessor)。 以下示例展示了如何执行此操作:

<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>

对于更简洁的配置,你可以改用命名空间支持,如以下示例所示:

<int:annotation-config>
    <int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>

对于 Java 配置,你必须使用 @EnablePublisher 注解,如以下示例所示:

@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
    ...
}

从版本 5.1.3 开始,<int:enable-publisher> 组件以及 @EnablePublisher 注解具有 proxy-target-classorder 属性,用于调整 ProxyFactory 配置。

与其他 Spring 注解(@Component@Scheduled 等)类似,你也可以将 @Publisher 用作元注解。 这意味着你可以定义自己的注解,这些注解以与 @Publisher 本身相同的方式处理。 以下示例展示了如何执行此操作:

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}

在前面的示例中,我们定义了 @Audit 注解,它本身用 @Publisher 注解。 另请注意,你可以在元注解上定义 channel 属性,以封装消息在此注解内部发送的位置。 现在你可以使用 @Audit 注解注释任何方法,如以下示例所示:

@Audit
public String test() {
    return "Hello";
}

在前面的示例中,test() 方法的每次调用都会生成一条消息,其负载由其返回值创建。 每条消息都发送到名为 auditChannel 的通道。 此技术的好处之一是你可以避免在多个注解中重复相同的通道名称。 你还可以在你自己的(可能是领域特定的)注解和框架提供的注解之间提供一个间接层。

你还可以注解类,这允许你将此注解的属性应用于该类的每个公共方法,如以下示例所示:

@Audit
static class BankingOperationsImpl implements BankingOperations {

  public String debit(String amount) {
     . . .

  }

  public String credit(String amount) {
     . . .
  }

}

使用 <publishing-interceptor> 元素的基于 XML 的方法

基于 XML 的方法允许你将相同的基于 AOP 的消息发布功能配置为 MessagePublishingInterceptor 的基于命名空间的配置。 它肯定比注解驱动的方法有一些优势,因为它允许你使用 AOP 切点表达式,从而可能一次拦截多个方法或拦截和发布你没有源代码的方法。

要使用 XML 配置消息发布,你只需执行以下两件事:

  • 使用 <publishing-interceptor> XML 元素为 MessagePublishingInterceptor 提供配置。

  • 提供 AOP 配置以将 MessagePublishingInterceptor 应用于托管对象。

以下示例展示了如何配置 publishing-interceptor 元素:

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
  <method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" value="something"/>
  </method>
  <method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" expression="'something'.toUpperCase()"/>
  </method>
  <method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>

<publishing-interceptor> 配置看起来与基于注解的方法非常相似,它也使用了 Spring 表达式语言的强大功能。

在前面的示例中,testBeanecho 方法的执行会渲染一个具有以下结构的 Message

  • Message 负载是 String 类型,内容如下:Echoing: [value],其中 value 是执行方法返回的值。

  • Message 有一个名为 things 的头,其值为 something

  • Message 发送到 echoChannel

第二种方法与第一种方法非常相似。 在这里,每个以“repl”开头的方法都会渲染一个具有以下结构的 Message

  • Message 负载与前面的示例相同。

  • Message 有一个名为 things 的头,其值是 SpEL 表达式 ’something'.toUpperCase() 的结果。

  • Message 发送到 echoChannel

第二种方法,映射任何以 echoDef 开头的方法的执行,会生成一个具有以下结构的 Message

  • Message 负载是执行方法返回的值。

  • 由于未提供 channel 属性,因此 Message 发送到 publisher 定义的 defaultChannel

对于简单的映射规则,你可以依赖 publisher 默认值,如以下示例所示:

<publishing-interceptor id="anotherInterceptor"/>

前面的示例将与切点表达式匹配的每个方法的返回值映射到负载并发送到 default-channel。 如果你未指定 defaultChannel(如前面的示例所示),则消息将发送到全局 nullChannel(相当于 /dev/null)。

异步发布

发布与组件的执行发生在同一个线程中。 因此,默认情况下,它是同步的。 这意味着整个消息流必须等到发布者的流完成。 然而,开发人员通常希望完全相反:使用此消息发布功能来启动异步流。 例如,你可能托管一个服务(HTTP、WS 等),该服务接收远程请求。 你可能希望将此请求内部发送到一个可能需要一段时间才能完成的处理中。 但是,你也可能希望立即回复用户。 因此,你可以使用“output-channel”或“replyChannel”头将简单的确认式回复发送回调用者,而不是将入站请求发送到输出通道进行处理(传统方式),同时使用消息发布功能来启动复杂的流。

以下示例中的服务接收一个复杂的负载(需要进一步发送进行处理),但它也需要向调用者回复一个简单的确认:

public String echo(Object complexPayload) {
     return "ACK";
}

因此,我们不将复杂流连接到输出通道,而是使用消息发布功能。 我们将其配置为通过使用服务方法的输入参数(如前面示例所示)创建新消息,并将其发送到“localProcessChannel”。 为了确保此流是异步的,我们所需要做的就是将其发送到任何类型的异步通道(下一个示例中的 ExecutorChannel)。 以下示例展示了如何异步 publishing-interceptor

<int:service-activator  input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>

<bean id="sampleService" class="test.SampleService"/>

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(sampleService)" />
</aop:config>

<int:publishing-interceptor id="interceptor" >
  <int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
    <int:header name="sample_header" expression="'some sample value'"/>
  </int:method>
</int:publishing-interceptor>

<int:channel id="localProcessChannel">
  <int:dispatcher task-executor="executor"/>
</int:channel>

<task:executor id="executor" pool-size="5"/>

处理此类场景的另一种方法是使用线束。 请参阅 线束

根据计划触发器生成和发布消息

在前面的部分中,我们研究了消息发布功能,该功能将消息作为方法调用的副产品进行构建和发布。 但是,在这些情况下,你仍然负责调用方法。 Spring Integration 2.0 通过“inbound-channel-adapter”元素上的新 expression 属性增加了对计划消息生产者和发布者的支持。 你可以根据多个触发器进行计划,其中任何一个都可以在“poller”元素上配置。 目前,我们支持 cronfixed-ratefixed-delay 以及你实现并由“trigger”属性值引用的任何自定义触发器。

如前所述,对计划生产者和发布者的支持通过 <inbound-channel-adapter> XML 元素提供。 考虑以下示例:

<int:inbound-channel-adapter id="fixedDelayProducer"
       expression="'fixedDelayTest'"
       channel="fixedDelayChannel">
    <int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>

前面的示例创建了一个入站通道适配器,该适配器构建一个 Message,其负载是 expression 属性中定义的表达式的结果。 每次发生 fixed-delay 属性指定的延迟时,都会创建并发送此类消息。

以下示例与前面的示例类似,只是它使用 fixed-rate 属性:

<int:inbound-channel-adapter id="fixedRateProducer"
       expression="'fixedRateTest'"
       channel="fixedRateChannel">
    <int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>

fixed-rate 属性允许你以固定速率(从每个任务的开始时间测量)发送消息。

以下示例展示了如何应用 Cron 触发器,其值在 cron 属性中指定:

<int:inbound-channel-adapter id="cronProducer"
       expression="'cronTest'"
       channel="cronChannel">
    <int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>

以下示例展示了如何将附加头插入消息中:

<int:inbound-channel-adapter id="headerExpressionsProducer"
       expression="'headerExpressionsTest'"
       channel="headerExpressionsChannel"
       auto-startup="false">
    <int:poller fixed-delay="5000"/>
    <int:header name="foo" expression="6 * 7"/>
    <int:header name="bar" value="x"/>
</int:inbound-channel-adapter>

附加消息头可以采用标量值或评估 Spring 表达式的结果。

如果你需要实现自己的自定义触发器,可以使用 trigger 属性提供对实现 org.springframework.scheduling.Trigger 接口的任何 Spring 配置 bean 的引用。 以下示例展示了如何执行此操作:

<int:inbound-channel-adapter id="triggerRefProducer"
       expression="'triggerRefTest'" channel="triggerRefChannel">
    <int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>

<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
    <beans:constructor-arg value="9999"/>
</beans:bean>