Apache Camel 支持
Spring Integration 提供 API 和配置,用于与在同一应用程序上下文中声明的 Apache Camel 端点进行通信。 你需要将此依赖项包含到你的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-camel</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-camel:{project-version}"
Spring Integration 和 Apache Camel 都实现了企业集成模式,并提供了方便的方式来组合它们,但这两个项目在 API 和抽象实现方面采用了不同的方法。
Spring Integration 完全依赖于 Spring Core 的依赖注入容器。
它使用许多其他 Spring 项目(Spring Data、Spring AMQP、Spring for Apache Kafka 等)来实现其通道适配器。
它还将 MessageChannel
抽象作为一等公民,开发人员在组合其集成流时需要了解这一点。
另一方面,Apache Camel 不提供消息通道的一等公民抽象,并建议通过内部交换来组合其路由,这些交换对 API 是隐藏的。
此外,它还需要一些额外的 依赖项和配置 才能在 Spring 应用程序中使用。
即使最终的企业集成解决方案的各个部分如何实现并不重要,但开发人员体验和高生产力是需要考虑的。
因此,开发人员可能出于多种原因选择一个框架而不是另一个,或者如果某些目标系统支持存在空白,则两者都选择。
Spring Integration 和 Apache Camel 应用程序可以通过许多外部协议相互交互,它们都实现了这些协议的通道适配器。
例如,Spring Integration 流可以将记录发布到 Apache Kafka 主题,该主题由消费者端的 Apache Camel 端点消费。
或者,Apache Camel 路由可以将数据写入 SFTP 文件目录,该目录由 Spring Integration 的 SFTP 入站通道适配器轮询。
或者,在同一个 Spring 应用程序上下文中,它们可以通过 ApplicationEvent
抽象 进行通信。
为了简化开发过程并避免不必要的网络跳转,Apache Camel 提供了一个 模块,用于通过消息通道与 Spring Integration 进行通信。
所需的一切都是对应用程序上下文中的 MessageChannel
的引用,用于发送或消费消息。
当 Apache Camel 路由是消息流的启动者,而 Spring Integration 仅作为解决方案的一部分扮演支持角色时,这工作得很好。
为了获得类似的开发人员体验,Spring Integration 现在提供了一个通道适配器来调用 Apache Camel 端点,并可选地等待回复。
没有入站通道适配器,因为从 Spring Integration API 和抽象的角度来看,订阅 MessageChannel
以消费 Apache Camel 消息就足够了。
Apache Camel 的出站通道适配器
CamelMessageHandler
是 AbstractReplyProducingMessageHandler
的实现,可以以单向(默认)和请求-回复两种模式工作。
它使用 org.apache.camel.ProducerTemplate
向 org.apache.camel.Endpoint
发送(或发送和接收)消息。
交互模式可以通过 ExchangePattern
选项控制(该选项可以在运行时通过 SpEL 表达式根据请求消息进行评估)。
目标 Apache Camel 端点可以明确配置,也可以配置为在运行时评估的 SpEL 表达式。
否则,它将回退到 ProducerTemplate
上提供的 defaultEndpoint
。
除了指定端点之外,还可以提供一个内联的、明确的 LambdaRouteBuilder
,例如,用于调用 Spring Integration 中没有通道适配器支持的 Apache Camel 组件。
此外,可以提供一个 HeaderMapper<org.apache.camel.Message>
(CamelHeaderMapper
是默认实现),以确定在 Spring Integration 和 Apache Camel 消息之间映射哪些头。
默认情况下,所有头都将被映射。
CamelMessageHandler
支持 async
模式,调用 ProducerTemplate.asyncSend()
并生成 CompletableFuture
进行回复处理(如果有的话)。
exchangeProperties
可以通过 SpEL 表达式进行自定义,该表达式必须评估为一个 Map
。
如果未提供 ProducerTemplate
,则通过从应用程序上下文中解析的 CamelContext
bean 创建它。
@Bean
@ServiceActivator(inputChannel = "sendToCamel")
CamelMessageHandler camelService(ProducerTemplate producerTemplate) {
CamelHeaderMapper headerMapper = new CamelHeaderMapper();
headerMapper.setOutboundHeaderNames("");
headerMapper.setInboundHeaderNames("testHeader");
CamelMessageHandler camelMessageHandler = new CamelMessageHandler(producerTemplate);
camelMessageHandler.setEndpointUri("direct:simple");
camelMessageHandler.setExchangePatternExpression(spelExpressionParser.parseExpression("headers.exchangePattern"));
camelMessageHandler.setHeaderMapper(headerMapper);
return camelMessageHandler;
}
对于 Java DSL 流定义,此通道适配器可以通过 Camel
工厂提供的几种变体进行配置:
@Bean
IntegrationFlow camelFlow() {
return f -> f
.handle(Camel.gateway().endpointUri("direct:simple"))
.handle(Camel.route(this::camelRoute))
.handle(Camel.handler().endpointUri("log:com.mycompany.order?level=WARN"));
}
private void camelRoute(RouteBuilder routeBuilder) {
routeBuilder.from("direct:inbound").transform(routeBuilder.simple("${body.toUpperCase()}"));
}