Spring Integration 框架概述
Spring Integration 提供了 Spring 编程模型的扩展,以支持众所周知的 企业集成模式。 它支持基于 Spring 的应用程序中的轻量级消息传递,并通过声明式适配器支持与外部系统的集成。 这些适配器在 Spring 对远程处理、消息传递和调度的支持之上提供了更高级别的抽象。 Spring Integration 的主要目标是为构建企业集成解决方案提供一个简单的模型,同时保持对于生成可维护、可测试代码至关重要的关注点分离。
Spring Integration 概述
本章高级别介绍了 Spring Integration 的核心概念和组件。 它包含了一些编程技巧,可帮助您充分利用 Spring Integration。
背景
Spring Framework 的关键主题之一是控制反转 (IoC)。 从广义上讲,这意味着框架代表其上下文中管理的组件处理职责。 组件本身得到简化,因为它们摆脱了这些职责。 例如,依赖注入将组件从定位或创建其依赖项的职责中解脱出来。 同样,面向切面编程通过将通用横切关注点模块化为可重用切面,从而将业务组件从这些关注点中解脱出来。 在每种情况下,最终结果都是一个更易于测试、理解、维护和扩展的系统。
此外,Spring Framework 和产品组合为构建企业应用程序提供了全面的编程模型。 开发人员受益于该模型的一致性,尤其是它基于公认的最佳实践,例如面向接口编程和优先选择组合而非继承。 Spring 简化的抽象和强大的支持库提高了开发人员的生产力,同时提高了可测试性和可移植性。
Spring Integration 也受这些相同目标和原则的驱动。 它将 Spring 编程模型扩展到消息传递领域,并建立在 Spring 现有的企业集成支持之上,以提供更高层次的抽象。 它支持消息驱动的架构,其中控制反转适用于运行时关注点,例如何时应运行某些业务逻辑以及应将响应发送到何处。 它支持消息的路由和转换,以便可以集成不同的传输和不同的数据格式,而不会影响可测试性。 换句话说,消息传递和集成关注点由框架处理。 业务组件进一步与基础设施隔离,开发人员也摆脱了复杂的集成职责。
作为 Spring 编程模型的扩展,Spring Integration 提供了多种配置选项,包括注解、带命名空间支持的 XML、带通用 “bean” 元素的 XML 以及直接使用底层 API。 该 API 基于定义良好的策略接口和非侵入式、委托式适配器。 Spring Integration 的设计灵感来源于对 Spring 内部常见模式与 Gregor Hohpe 和 Bobby Woolf (Addison Wesley, 2004) 在 Enterprise Integration Patterns 中描述的著名模式之间存在强烈亲和性的认识。 阅读过该书的开发人员应该会立即熟悉 Spring Integration 的概念和术语。
目标和原则
Spring Integration 的目标如下:
-
为实现复杂的企业集成解决方案提供一个简单的模型。
-
促进基于 Spring 的应用程序中的异步、消息驱动行为。
-
促进现有 Spring 用户直观、渐进地采用。
Spring Integration 遵循以下原则:
-
组件应松散耦合,以实现模块化和可测试性。
-
框架应强制业务逻辑和集成逻辑之间的关注点分离。
-
扩展点本质上应是抽象的(但在定义良好的边界内),以促进重用和可移植性。
主要组件
从垂直角度看,分层架构有助于关注点分离,层之间的基于接口的契约促进了松散耦合。 基于 Spring 的应用程序通常以这种方式设计,Spring 框架和产品组合为企业应用程序的整个堆栈遵循此最佳实践提供了坚实的基础。 消息驱动架构增加了横向视角,但这些相同的目标仍然相关。 正如 “分层架构” 是一个极其通用和抽象的范式一样,消息传递系统通常遵循类似的抽象 “管道和过滤器” 模型。 “过滤器” 代表任何能够生成或消费消息的组件,“管道” 在过滤器之间传输消息,以便组件本身保持松散耦合。 重要的是要注意,这两个高级范式并非相互排斥。 支持 “管道” 的底层消息传递基础设施仍应封装在合同定义为接口的层中。 同样,“过滤器” 本身应在逻辑上位于应用程序服务层之上的层中进行管理,以与 Web 层大致相同的方式通过接口与这些服务进行交互。
消息
在 Spring Integration 中,消息是任何 Java 对象的通用包装器,并结合了框架在处理该对象时使用的元数据。 它由一个有效载荷和头部组成。 有效载荷可以是任何类型,头部则包含常用的信息,如 ID、时间戳、关联 ID 和返回地址。 头部也用于向连接的传输传递值和从连接的传输接收值。 例如,当从接收到的文件创建消息时,文件名可以存储在头部中,供下游组件访问。 同样,如果消息的内容最终要由出站邮件适配器发送,则各种属性(收件人、发件人、抄送、主题等)可以由上游组件配置为消息头部值。 开发人员还可以在头部中存储任何任意的键值对。

消息通道
消息通道代表了管道和过滤器架构的 “管道”。 生产者向通道发送消息,消费者从通道接收消息。 因此,消息通道解耦了消息组件,并为消息的拦截和监控提供了一个方便的入口点。

消息通道可以遵循点对点或发布-订阅语义。 对于点对点通道,发送到通道的每条消息最多只能有一个消费者接收。 另一方面,发布-订阅通道尝试将每条消息广播给通道上的所有订阅者。 Spring Integration 支持这两种模型。
虽然 “点对点” 和 “发布-订阅” 定义了最终有多少消费者接收每条消息的两个选项,但还有一个重要的考虑因素:通道是否应该缓冲消息? 在 Spring Integration 中,可轮询通道能够将消息缓冲在队列中。 缓冲的优点是它允许限制入站消息,从而防止消费者过载。 然而,顾名思义,这也会增加一些复杂性,因为消费者只有在配置了轮询器的情况下才能从这样的通道接收消息。 另一方面,连接到可订阅通道的消费者只是消息驱动的。 消息通道实现 详细讨论了 Spring Integration 中可用的各种通道实现。
消息端点
Spring Integration 的主要目标之一是通过控制反转来简化企业集成解决方案的开发。 这意味着您无需直接实现消费者和生产者,甚至无需构建消息并在消息通道上调用发送或接收操作。 相反,您应该能够专注于您的特定领域模型,其实现基于普通对象。 然后,通过提供声明式配置,您可以将您的领域特定代码 “连接” 到 Spring Integration 提供的消息传递基础设施。 负责这些连接的组件是消息端点。 这并不意味着您应该直接连接现有的应用程序代码。 任何真实的企业集成解决方案都需要一定量的代码专注于集成关注点,例如路由和转换。 重要的是实现集成逻辑和业务逻辑之间的关注点分离。 换句话说,与 Web 应用程序的 Model-View-Controller (MVC) 范式一样,目标应该是提供一个薄而专用的层,将入站请求转换为服务层调用,然后将服务层返回值转换为出站回复。 下一节概述了处理这些职责的消息端点类型,在接下来的章节中,您将看到 Spring Integration 的声明式配置选项如何提供一种非侵入式的方式来使用这些端点。
消息端点
消息端点代表了管道和过滤器架构的 “过滤器”。 如前所述,端点的主要作用是将应用程序代码连接到消息传递框架,并以非侵入式的方式进行。 换句话说,应用程序代码理想情况下不应该知道消息对象或消息通道。 这类似于 MVC 范式中控制器的作用。 正如控制器处理 HTTP 请求一样,消息端点处理消息。 正如控制器映射到 URL 模式一样,消息端点映射到消息通道。 两者的目标相同:将应用程序代码与基础设施隔离。 这些概念以及所有后续模式在 Enterprise Integration Patterns 一书中都有详细讨论。 在此,我们仅对 Spring Integration 支持的主要端点类型及其相关角色进行高级描述。 接下来的章节将详细阐述并提供示例代码以及配置示例。
消息转换器
消息转换器负责转换消息的内容或结构并返回修改后的消息。
最常见的转换器类型可能是将消息的有效载荷从一种格式转换为另一种格式(例如从 XML 转换为 java.lang.String
)。
同样,转换器可以添加、删除或修改消息的头部值。
消息过滤器
消息过滤器决定消息是否应该被传递到输出通道。
这只需要一个布尔测试方法,该方法可以检查特定的有效载荷内容类型、属性值、头部的存在或其他条件。
如果消息被接受,它将被发送到输出通道。
如果不是,它将被丢弃(或者,对于更严格的实现,可能会抛出 Exception
)。
消息过滤器通常与发布-订阅通道结合使用,其中多个消费者可能会收到同一条消息,并使用过滤器的条件来缩小要处理的消息集。
请注意不要混淆管道和过滤器架构模式中 “过滤器” 的通用用法与这种特定的端点类型,后者选择性地缩小了两个通道之间流动的消息范围。 “过滤器” 的管道和过滤器概念与 Spring Integration 的消息端点更匹配:任何可以连接到消息通道以发送或接收消息的组件。 |
消息路由
消息路由负责决定接下来哪个通道或哪些通道(如果有)应该接收消息。 通常,该决定基于消息的内容或消息头部中可用的元数据。 消息路由通常用作服务激活器或其他能够发送回复消息的端点上静态配置的输出通道的动态替代方案。 同样,消息路由为前面描述的多个订阅者使用的反应式消息过滤器提供了主动替代方案。

聚合器
聚合器基本上是分割器的镜像,它是一种消息端点,接收多条消息并将它们组合成一条消息。
事实上,聚合器通常是包含分割器的管道中的下游消费者。
从技术上讲,聚合器比分割器更复杂,因为它需要维护状态(要聚合的消息),决定何时所有消息都可用,并在必要时进行超时。
此外,如果发生超时,聚合器需要知道是发送部分结果、丢弃它们,还是将它们发送到单独的通道。
Spring Integration 提供了 CorrelationStrategy
、ReleaseStrategy
以及可配置的超时设置、是否在超时时发送部分结果以及丢弃通道。
服务激活器
服务激活器是用于将服务实例连接到消息系统的通用端点。 必须配置输入消息通道,并且,如果要调用的服务方法能够返回值,也可以提供输出消息通道。
输出通道是可选的,因为每条消息也可以提供自己的“返回地址”头部。 此规则适用于所有消费者端点。 |
服务激活器调用某个服务对象上的操作来处理请求消息,提取请求消息的有效载荷并进行转换(如果方法不期望消息类型参数)。 当服务对象的方法返回一个值时,该返回值也会在必要时转换为回复消息(如果它不是消息类型)。 该回复消息被发送到输出通道。 如果未配置输出通道,则回复将发送到消息的 “返回地址” 中指定的通道(如果可用)。
请求-回复服务激活器端点将目标对象的方法连接到输入和输出消息通道。

如前所述,在 overview-components-channel 中,通道可以是可轮询的或可订阅的。 在上述图中,这由 “时钟” 符号和实心箭头(轮询)以及虚线箭头(订阅)表示。 |
通道适配器
通道适配器是将消息通道连接到其他系统或传输的端点。 通道适配器可以是入站或出站的。 通常,通道适配器在消息与从其他系统(文件、HTTP 请求、JMS 消息等)接收或发送到其他系统的任何对象或资源之间进行一些映射。 根据传输方式,通道适配器还可以填充或提取消息头部值。 Spring Integration 提供了许多通道适配器,这些适配器将在后续章节中进行描述。

MessageChannel
。
消息源可以是可轮询的(例如,POP3)或消息驱动的(例如,IMAP Idle)。 在上述图中,这由 “时钟” 符号和实心箭头(轮询)以及虚线箭头(消息驱动)表示。 |

MessageChannel
连接到目标系统。
如前所述,在 overview-components-channel 中,通道可以是可轮询的或可订阅的。 在上述图中,这由 “时钟” 符号和实心箭头(轮询)以及虚线箭头(订阅)表示。 |
端点 Bean 名称
消费端点(任何带有 inputChannel
的端点)由两个 bean 组成:消费者和消息处理器。
消费者引用消息处理器,并在消息到达时调用它。
考虑以下 XML 示例:
<int:service-activator id = "someService" ... />
根据上述示例,bean 名称如下:
-
消费者:
someService
(id
) -
处理器:
someService.handler
当使用企业集成模式(EIP)注解时,名称取决于几个因素。 考虑以下注解 POJO 的示例:
@Component
public class SomeComponent {
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
根据上述示例,bean 名称如下:
-
消费者:
someComponent.someMethod.serviceActivator
-
处理器:
someComponent.someMethod.serviceActivator.handler
从 5.0.4 版开始,您可以使用 @EndpointId
注解修改这些名称,如下例所示:
@Component
public class SomeComponent {
@EndpointId("someService")
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
根据上述示例,bean 名称如下:
-
消费者:
someService
-
处理器:
someService.handler
@EndpointId
创建的名称与 XML 配置中的 id
属性创建的名称相同。
考虑以下注解 bean 的示例:
@Configuration
public class SomeConfiguration {
@Bean
@ServiceActivator(inputChannel = ...)
public MessageHandler someHandler() {
...
}
}
根据上述示例,bean 名称如下:
-
消费者:
someConfiguration.someHandler.serviceActivator
-
处理器:
someHandler
(@Bean
名称)
从 5.0.4 版开始,您可以使用 @EndpointId
注解修改这些名称,如下例所示:
@Configuration
public class SomeConfiguration {
@Bean("someService.handler") [id="CO1-1"]1
@EndpointId("someService") [id="CO1-2"]2
@ServiceActivator(inputChannel = ...)
public MessageHandler someHandler() {
...
}
}
<1> 处理器:`someService.handler`(bean 名称) <1> 消费者:`someService`(端点 ID)
@EndpointId
注解创建的名称与 XML 配置中的 id
属性创建的名称相同,只要您遵循将 .handler
附加到 @Bean
名称的约定。
在一种特殊情况下会创建第三个 bean:出于架构原因,如果 MessageHandler
@Bean
未定义 AbstractReplyProducingMessageHandler
,则框架会将提供的 bean 包装在 ReplyProducingMessageHandlerWrapper
中。
此包装器支持请求处理程序建议处理并发出正常的“未产生回复”调试日志消息。
其 bean 名称是处理程序 bean 名称加上 .wrapper
(当存在 @EndpointId
时 — 否则,它是正常的生成的处理程序名称)。
同样,可轮询消息源 会创建两个 bean:SourcePollingChannelAdapter
(SPCA) 和 MessageSource
。
考虑以下 XML 配置:
<int:inbound-channel-adapter id = "someAdapter" ... />
根据上述 XML 配置,bean 名称如下:
-
SPCA:
someAdapter
(id
) -
处理器:
someAdapter.source
考虑以下 POJO 的 Java 配置以定义 @EndpointId
:
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public String pojoSource() {
...
}
根据上述 Java 配置示例,bean 名称如下:
-
SPCA:
someAdapter
-
处理器:
someAdapter.source
考虑以下 bean 的 Java 配置以定义 @EndpointID
:
@Bean("someAdapter.source")
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> source() {
return () -> {
...
};
}
根据上述示例,bean 名称如下:
-
SPCA:
someAdapter
-
处理器:
someAdapter.source
(只要您遵循将.source
附加到@Bean
名称的约定)
配置和 @EnableIntegration
在本文档中,您将看到对 XML 命名空间支持的引用,用于在 Spring Integration 流中声明元素。
这种支持由一系列命名空间解析器提供,它们生成适当的 bean 定义来实现特定组件。
例如,许多端点由一个 MessageHandler
bean 和一个 ConsumerEndpointFactoryBean
组成,其中注入了处理器和输入通道名称。
首次遇到 Spring Integration 命名空间元素时,框架会自动声明许多 bean(任务调度器、隐式通道创建器等),这些 bean 用于支持运行时环境。
版本 4.0 引入了 @EnableIntegration
注解,以允许注册 Spring Integration 基础设施 bean(请参阅 Javadoc)。
当仅使用 Java 配置时,此注解是必需的——例如,在 Spring Boot 或 Spring Integration 消息注解支持和 Spring Integration Java DSL 中,没有 XML 集成配置。
当您有一个没有 Spring Integration 组件的父上下文和两个或多个使用 Spring Integration 的子上下文时,@EnableIntegration
注解也很有用。
它允许这些通用组件只在父上下文中声明一次。
@EnableIntegration
注解向应用程序上下文注册许多基础设施组件。
特别是,它:
-
注册一些内置 bean,例如
errorChannel
及其LoggingHandler
、用于轮询器的taskScheduler
、jsonPath
SpEL 函数等。 -
添加多个
BeanFactoryPostProcessor
实例以增强BeanFactory
,用于全局和默认集成环境。 -
添加多个
BeanPostProcessor
实例以增强或转换和包装特定 bean,用于集成目的。 -
添加注解处理器以解析消息注解并向应用程序上下文注册其组件。
@IntegrationComponentScan
注解还允许类路径扫描。
此注解扮演的角色类似于标准 Spring Framework @ComponentScan
注解,但它仅限于 Spring Integration 特有的组件和注解,而标准 Spring Framework 组件扫描机制无法触及这些组件和注解。
有关示例,请参阅 @MessagingGateway
注解。
@EnablePublisher
注解注册一个 PublisherAnnotationBeanPostProcessor
bean,并为那些没有 channel
属性的 @Publisher
注解配置 default-publisher-channel
。
如果找到多个 @EnablePublisher
注解,它们必须都具有相同的默认通道值。
有关更多信息,请参阅 带有 @Publisher
注解的注解驱动配置。
@GlobalChannelInterceptor
注解已引入,用于将 ChannelInterceptor
bean 标记为全局通道拦截。
此注解类似于 <int:channel-interceptor>
XML 元素(请参阅 全局通道拦截器配置)。
@GlobalChannelInterceptor
注解可以放置在类级别(带有 @Component
构造型注解)或 @Configuration
类中的 @Bean
方法上。
在任何一种情况下,bean 都必须实现 ChannelInterceptor
。
从 5.1 版开始,全局通道拦截器适用于动态注册的通道——例如,使用 beanFactory.initializeBean()
或通过 IntegrationFlowContext
(使用 Java DSL 时)初始化的 bean。
以前,当 bean 在应用程序上下文刷新后创建时,拦截器不适用。
@IntegrationConverter
注解将 Converter
、GenericConverter
或 ConverterFactory
bean 标记为 integrationConversionService
的候选转换器。
此注解类似于 <int:converter>
XML 元素(请参阅 有效载荷类型转换)。
您可以将 @IntegrationConverter
注解放置在类级别(带有 @Component
构造型注解)或 @Configuration
类中的 @Bean
方法上。
有关消息注解的更多信息,请参阅 注解支持。
编程注意事项
Spring Integration 中的大多数类(除非另有说明)必须在应用程序上下文中声明为 bean 并作为单例。
这意味着这些类的实例是线程安全的,它们的生命周期以及与其他组件的连接由 Spring 依赖注入容器管理。
实用工具和构建器类(JacksonJsonUtils
、MessageBuilder
、ExpressionEvalMap
、IntegrationReactiveUtils
等)可以直接在 Java 代码中使用。
但是,Java DSL 工厂和 IntegrationComponentSpec
实现结果仍然必须注册为应用程序上下文中的 bean。
许多模块中存在的 Session
抽象不是线程安全的,通常由 Factory
模式实现创建,并从线程安全的 Template
模式中使用。
例如,请参阅 SftpRemoteFileTemplate
及其与 DefaultSftpSessionFactory
的关系。
您应该尽可能使用普通 Java 对象(POJO)(用于目标逻辑中的消息处理),并且只有在绝对必要时才在代码中公开框架。 有关更多信息,请参阅 pojo-invocation。
如果您确实将框架暴露给您的类,则需要考虑一些事项,尤其是在应用程序启动期间:
-
如果您的组件是
ApplicationContextAware
,您通常不应在setApplicationContext()
方法中使用ApplicationContext
。 相反,存储一个引用并将此类使用推迟到上下文生命周期的后期。 -
如果您的组件是
InitializingBean
或使用@PostConstruct
方法,请不要从这些初始化方法发送任何消息。 应用程序上下文在调用这些方法时尚未初始化,发送此类消息很可能会失败。 如果您需要在启动期间发送消息,请实现ApplicationListener
并等待ContextRefreshedEvent
。 或者,实现SmartLifecycle
,将您的 bean 放在后期阶段,并从start()
方法发送消息。
使用打包(例如,着色)Jar 时的注意事项
Spring Integration 使用 Spring Framework 的 SpringFactories
机制加载多个 IntegrationConfigurationInitializer
类来引导某些功能。
这包括 -core
jar 以及其他一些 jar,包括 -http
和 -jmx
。
此过程的信息存储在每个 jar 的 META-INF/spring.factories
文件中。
一些开发人员喜欢使用知名工具(例如 Apache Maven Shade Plugin)将他们的应用程序和所有依赖项重新打包到一个 jar 中。
默认情况下,shade 插件在生成着色 jar 时不会合并 spring.factories
文件。
除了 spring.factories
,其他 META-INF
文件(spring.handlers
和 spring.schemas
)用于 XML 配置。
这些文件也需要合并。
Spring Boot 的可执行 jar 机制 采用不同的方法,它嵌套 jar,从而在类路径上保留每个 spring.factories
文件。
因此,对于 Spring Boot 应用程序,如果您使用其默认的可执行 jar 格式,则无需执行其他操作。
即使您不使用 Spring Boot,您仍然可以使用 Boot 提供的工具通过为上述文件添加转换器来增强 shade 插件。 以下示例显示了如何配置插件:
...
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<dependencies>
<dependency> [id="CO2-1"]1
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers> [id="CO2-2"]2
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
...
具体来说,
<1> 添加 spring-boot-maven-plugin
作为依赖项。
<2> 配置转换器。
您可以添加 ${spring.boot.version}
的属性或使用显式版本。
编程技巧和诀窍
本节介绍了一些充分利用 Spring Integration 的方法。
XML 模式
使用 XML 配置时,为避免出现错误的模式验证错误,您应该使用 “Spring 感知” IDE,例如 Spring Tool Suite (STS)、带有 Spring IDE 插件的 Eclipse 或 IntelliJ IDEA。
这些 IDE 知道如何从类路径解析正确的 XML 模式(通过使用 jar 中的 META-INF/spring.schemas
文件)。
当使用 STS 或带有插件的 Eclipse 时,您必须在项目上启用 Spring Project Nature
。
某些旧模块(版本 1.0 中存在的模块)在互联网上托管的模式是 1.0 版本,以实现兼容性。 如果您的 IDE 使用这些模式,您很可能会看到错误的错误。
这些在线模式中的每一个都有一个类似的警告:
此模式适用于 Spring Integration Core 的 1.0 版本。 我们无法将其更新到当前模式,因为这会破坏使用 1.0.3 或更低版本的任何应用程序。 对于后续版本,将从类路径解析“无版本”模式并从 jar 中获取。 请参阅 GitHub: [role="bare"][role="bare"]github.com/spring-projects/spring-integration/tree/main/spring-integration-core/src/main/resources/org/springframework/integration/config
受影响的模块是
-
core
(spring-integration.xsd
) -
file
-
http
-
jms
-
mail
-
security
-
stream
-
ws
-
xml
查找 Java 和 DSL 配置的类名
使用 XML 配置和 Spring Integration 命名空间支持时,XML 解析器隐藏了目标 bean 的声明和连接方式。 对于 Java 配置,了解目标最终用户应用程序的框架 API 非常重要。
EIP 实现的一等公民是 Message
、Channel
和 Endpoint
(参见本章前面的 overview-components)。
它们的实现(合同)是:
-
org.springframework.messaging.Message
: 参见 overview-components-message; -
org.springframework.messaging.MessageChannel
: 参见 消息通道; -
org.springframework.integration.endpoint.AbstractEndpoint
: 参见 轮询器。
前两个足够简单,可以理解如何实现、配置和使用。 最后一个值得更多关注。
AbstractEndpoint
在 Spring 框架中广泛用于不同的组件实现。
其主要实现包括:
-
EventDrivenConsumer
,当我们订阅SubscribableChannel
以监听消息时使用。 -
PollingConsumer
,当我们从PollableChannel
轮询消息时使用。
当您使用消息传递注解或 Java DSL 时,您无需担心这些组件,因为框架会自动使用适当的注解和 BeanPostProcessor
实现来生成它们。
手动构建组件时,您应该使用 ConsumerEndpointFactoryBean
来帮助根据提供的 inputChannel
属性确定要创建的目标 AbstractEndpoint
消费者实现。
另一方面,ConsumerEndpointFactoryBean
委托给框架中的另一个一等公民 —— org.springframework.messaging.MessageHandler
。
此接口实现的目标是处理端点从通道消费的消息。
Spring Integration 中的所有 EIP 组件都是 MessageHandler
实现(例如,AggregatingMessageHandler
、MessageTransformingHandler
、AbstractMessageSplitter
等)。
目标协议出站适配器(FileWritingMessageHandler
、HttpRequestExecutingMessageHandler
、AbstractMqttMessageHandler
等)也是 MessageHandler
实现。
当您使用 Java 配置开发 Spring Integration 应用程序时,您应该查看 Spring Integration 模块以找到适合 @ServiceActivator
配置的 MessageHandler
实现。
例如,要发送 XMPP 消息(参见 XMPP 支持),您应该配置如下内容:
@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);
DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
xmppHeaderMapper.setRequestHeaderNames("*");
handler.setHeaderMapper(xmppHeaderMapper);
return handler;
}
MessageHandler
实现代表消息流的出站和处理部分。
入站消息流端有其自己的组件,它们分为轮询和监听行为。
监听(消息驱动)组件很简单,通常只需要一个目标类实现即可准备好生成消息。
监听组件可以是单向 MessageProducerSupport
实现(例如 AbstractMqttMessageDrivenChannelAdapter
和 ImapIdleChannelAdapter
),也可以是请求-回复 MessagingGatewaySupport
实现(例如 AmqpInboundGateway
和 AbstractWebServiceInboundGateway
)。
轮询入站端点适用于那些不提供监听器 API 或不打算用于此类行为的协议,包括任何基于文件的协议(例如 FTP)、任何数据库(RDBMS 或 NoSQL)等。
这些入站端点由两个组件组成:轮询器配置,用于定期启动轮询任务,以及消息源类,用于从目标协议读取数据并为下游集成流生成消息。
轮询器配置的第一类是 SourcePollingChannelAdapter
。
它是另一个 AbstractEndpoint
实现,但专门用于轮询以启动集成流。
通常,使用消息传递注解或 Java DSL 时,您无需担心此类。
框架会根据 @InboundChannelAdapter
配置或 Java DSL 构建器规范为其生成一个 bean。
消息源组件对于目标应用程序开发更为重要,它们都实现了 MessageSource
接口(例如,MongoDbMessageSource
和 AbstractTwitterMessageSource
)。
考虑到这一点,我们从 JDBC 读取 RDBMS 表数据的配置可能如下所示:
@Bean
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixedDelay="5000"))
public MessageSource<?> storedProc(DataSource dataSource) {
return new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM foo where status = 0");
}
您可以在特定的 Spring Integration 模块中找到目标协议所需的所有入站和出站类(在大多数情况下,在相应的包中)。
例如,spring-integration-websocket
适配器是:
-
o.s.i.websocket.inbound.WebSocketInboundChannelAdapter
:实现MessageProducerSupport
,用于监听套接字上的帧并将消息生成到通道。 -
o.s.i.websocket.outbound.WebSocketOutboundMessageHandler
:单向AbstractMessageHandler
实现,用于将传入消息转换为适当的帧并通过 websocket 发送。
如果您熟悉 Spring Integration XML 配置,从 4.3 版本开始,我们在 XSD 元素定义中提供了有关哪些目标类用于为适配器或网关声明 bean 的信息,如下例所示:
<xsd:element name="outbound-async-gateway">
<xsd:annotation>
<xsd:documentation>
Configures a Consumer Endpoint for the 'o.s.i.amqp.outbound.AsyncAmqpOutboundGateway'
that will publish an AMQP Message to the provided Exchange and expect a reply Message.
The sending thread returns immediately; the reply is sent asynchronously; uses 'AsyncRabbitTemplate.sendAndReceive()'.
</xsd:documentation>
</xsd:annotation>
POJO 方法调用
如 programming-considerations 中所述,我们建议使用 POJO 编程风格,如下例所示:
@ServiceActivator
public String myService(String payload) { ... }
在这种情况下,框架提取 String
有效载荷,调用您的方法,并将结果包装在消息中发送到流中的下一个组件(原始头部被复制到新消息中)。
事实上,如果您使用 XML 配置,您甚至不需要 @ServiceActivator
注解,如下面的成对示例所示:
<int:service-activator ... ref="myPojo" method="myService" />
public String myService(String payload) { ... }
只要类上的公共方法没有歧义,您就可以省略 method
属性。
您还可以在 POJO 方法中获取头部信息,如下例所示:
@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }
您还可以取消引用消息上的属性,如下例所示:
@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }
由于各种 POJO 方法调用都可用,因此 5.0 之前的版本使用 SpEL(Spring 表达式语言)来调用 POJO 方法。
SpEL(即使是解释执行)对于这些操作通常 “足够快”,与方法中通常完成的实际工作相比。
然而,从 5.0 版本开始,默认情况下尽可能使用 org.springframework.messaging.handler.invocation.InvocableHandlerMethod
。
这种技术通常比解释执行的 SpEL 速度更快,并且与其他 Spring 消息传递项目保持一致。
InvocableHandlerMethod
类似于在 Spring MVC 中调用控制器方法的技术。
某些方法在使用 SpEL 时仍然始终被调用。
示例包括带有取消引用属性的注解参数,如前所述。
这是因为 SpEL 具有导航属性路径的能力。
可能还有一些我们没有考虑到的其他特殊情况也无法与 InvocableHandlerMethod
实例一起使用。
因此,在这些情况下,我们会自动回退到使用 SpEL。
如果需要,您还可以设置您的 POJO 方法,使其始终使用 SpEL,使用 UseSpelInvoker
注解,如下例所示:
@UseSpelInvoker(compilerMode = "IMMEDIATE")
public void bar(String bar) { ... }
如果省略 compilerMode
属性,则 spring.expression.compiler.mode
系统属性决定编译器模式。
有关编译 SpEL 的更多信息,请参阅 SpEL 编译。