WebFlux 支持
WebFlux Spring Integration 模块 (spring-integration-webflux
) 允许以响应式方式执行 HTTP 请求和处理入站 HTTP 请求。
你需要将此依赖项包含到你的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:{project-version}"
如果是非 Servlet 的服务器配置,则必须包含 io.projectreactor.netty:reactor-netty
依赖项。
WebFlux 支持包括以下网关实现:WebFluxInboundEndpoint
和 WebFluxRequestExecutingMessageHandler
。
此支持完全基于 Spring WebFlux 和 Project Reactor 基础。
有关更多信息,请参阅 HTTP 支持,因为许多选项在响应式和常规 HTTP 组件之间共享。
WebFlux 命名空间支持
Spring Integration 提供了 webflux
命名空间和相应的 Schema 定义。
要将其包含在你的配置中,请在你的应用程序上下文配置文件中添加以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>
WebFlux 入站组件
从 5.0 版本开始,提供了 WebHandler
的 WebFluxInboundEndpoint
实现。
此组件类似于基于 MVC 的 HttpRequestHandlingEndpointSupport
,它通过新提取的 BaseHttpInboundEndpoint
共享一些通用选项。
它在 Spring WebFlux 响应式环境(而不是 MVC)中使用。
以下示例显示了 WebFlux 端点的简单实现:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
@Bean
fun inboundChannelAdapterFlow() =
integrationFlow(
WebFlux.inboundChannelAdapter("/reactivePost")
.apply {
requestMapping { m -> m.methods(HttpMethod.POST) }
requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
statusCodeFunction { m -> HttpStatus.ACCEPTED }
})
{
channel { queue("storeChannel") }
}
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
@Bean
public WebFluxInboundEndpoint simpleInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/test");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("serviceChannel");
return endpoint;
}
@ServiceActivator(inputChannel = "serviceChannel")
String service() {
return "It works!";
}
}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
<int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>
配置类似于 HttpRequestHandlingEndpointSupport
(在示例之前提到),不同之处在于我们使用 @EnableWebFlux
将 WebFlux 基础设施添加到我们的集成应用程序中。
此外,WebFluxInboundEndpoint
通过使用响应式 HTTP 服务器实现提供的背压、按需能力,执行到下游流的 sendAndReceive
操作。
回复部分也是非阻塞的,并且基于内部 |
你可以使用自定义 ServerCodecConfigurer
、RequestedContentTypeResolver
甚至 ReactiveAdapterRegistry
配置 WebFluxInboundEndpoint
。
后者提供了一种机制,你可以使用它将回复作为任何响应式类型返回:Reactor Flux
、RxJava Observable
、Flowable
等。
通过这种方式,我们可以使用 Spring Integration 组件实现 服务器发送事件 场景,如下例所示:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
@Bean
fun sseFlow() =
integrationFlow(
WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
{
handle { (p, h) -> Flux.just("foo", "bar", "baz") }
}
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/sse");
requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("requests");
return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
path="test1"
auto-startup="false"
phase="101"
request-payload-type="byte[]"
error-channel="errorChannel"
payload-expression="payload"
supported-methods="PUT"
status-code-expression="'202'"
header-mapper="headerMapper"
codec-configurer="codecConfigurer"
reactive-adapter-registry="reactiveAdapterRegistry"
requested-content-type-resolver="requestedContentTypeResolver">
<int-webflux:request-mapping headers="foo"/>
<int-webflux:cross-origin origin="foo" method="PUT"/>
<int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>
有关更多可能的配置选项,请参阅 请求映射支持 和 跨域资源共享 (CORS) 支持。
当请求体为空或 payloadExpression
返回 null
时,请求参数 (MultiValueMap<String, String>
) 用于目标消息的 payload
以进行处理。
负载验证
从 5.2 版本开始,WebFluxInboundEndpoint
可以配置 Validator
。
与 HTTP 支持 中的 MVC 验证不同,它用于在执行回退和 payloadExpression
函数之前,验证 Publisher
中的元素,请求已由 HttpMessageReader
转换为该 Publisher
。
框架无法假设在构建最终负载后 Publisher
对象可能有多复杂。
如果需要限制对最终负载(或其 Publisher
元素)的验证可见性,则验证应在 WebFlux 端点下游进行。
有关更多信息,请参阅 Spring WebFlux 文档。
无效负载将以 IntegrationWebExchangeBindException
(WebExchangeBindException
扩展)拒绝,其中包含所有验证 Errors
。
有关验证的更多信息,请参阅 Spring Framework 参考手册。
WebFlux 出站组件
WebFluxRequestExecutingMessageHandler
(从 5.0 版本开始)实现类似于 HttpRequestExecutingMessageHandler
。
它使用 Spring Framework WebFlux 模块中的 WebClient
。
要配置它,请定义一个类似于以下的 bean:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
integrationFlow {
handle(
WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri()
})
.httpMethod(HttpMethod.GET)
.expectedResponseType(String::class.java)
)
}
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
WebFluxRequestExecutingMessageHandler handler =
new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
request-channel="requests"
url="http://localhost/test"
http-method-expression="headers.httpMethod"
extract-request-payload="false"
expected-response-type-expression="payload"
charset="UTF-8"
reply-timeout="1234"
reply-channel="replies"/>
<int-webflux:outbound-channel-adapter id="reactiveExample2"
url="http://localhost/example"
http-method="GET"
channel="requests"
charset="UTF-8"
extract-payload="false"
expected-response-type="java.lang.String"
order="3"
auto-startup="false"/>
WebClient
exchange()
操作返回 Mono<ClientResponse>
,它通过几个 Mono.map()
步骤映射到 AbstractIntegrationMessageBuilder
作为 WebFluxRequestExecutingMessageHandler
的输出。
与作为 outputChannel
的 ReactiveChannel
一起,Mono<ClientResponse>
评估被推迟,直到下游订阅发生。
否则,它被视为 async
模式,并且 Mono
响应被适配为 SettableListenableFuture
,用于 WebFluxRequestExecutingMessageHandler
的异步回复。
输出消息的目标负载取决于 WebFluxRequestExecutingMessageHandler
配置。
setExpectedResponseType(Class<?>)
或 setExpectedResponseTypeExpression(Expression)
标识响应体元素转换的目标类型。
如果 replyPayloadToFlux
设置为 true
,则响应体将转换为 Flux
,其中每个元素都具有提供的 expectedResponseType
,并且此 Flux
作为负载发送到下游。
之后,你可以使用 拆分器 以响应式方式迭代此 Flux
。
此外,可以将 BodyExtractor<?, ClientHttpResponse>
注入到 WebFluxRequestExecutingMessageHandler
中,而不是 expectedResponseType
和 replyPayloadToFlux
属性。
它可用于低级别访问 ClientHttpResponse
,并更好地控制主体和 HTTP 标头转换。
Spring Integration 提供 ClientHttpResponseBodyExtractor
作为身份函数,以生成(下游)整个 ClientHttpResponse
和任何其他可能的自定义逻辑。
从 5.2 版本开始,WebFluxRequestExecutingMessageHandler
支持响应式 Publisher
、Resource
和 MultiValueMap
类型作为请求消息负载。
内部使用相应的 BodyInserter
填充到 WebClient.RequestBodySpec
中。
当负载是响应式 Publisher
时,可以使用配置的 publisherElementType
或 publisherElementTypeExpression
来确定发布者的元素类型。
表达式必须解析为 Class<?>
、String
(解析为目标 Class<?>
)或 ParameterizedTypeReference
。
从 5.5 版本开始,WebFluxRequestExecutingMessageHandler
暴露了一个 extractResponseBody
标志(默认为 true
),用于仅返回响应体,或返回整个 ResponseEntity
作为回复消息负载,而与提供的 expectedResponseType
或 replyPayloadToFlux
无关。
如果 ResponseEntity
中不存在主体,则忽略此标志并返回整个 ResponseEntity
。
有关更多可能的配置选项,请参阅 HTTP 出站组件。
WebFlux 标头映射
由于 WebFlux 组件完全基于 HTTP 协议,因此 HTTP 标头映射没有区别。 有关更多可能的选项和用于映射标头的组件,请参阅 HTTP 标头映射。
WebFlux 请求属性
从 6.0 版本开始,WebFluxRequestExecutingMessageHandler
可以配置为通过 setAttributeVariablesExpression()
评估请求属性。
此 SpEL 表达式必须在 Map
中进行评估。
然后,此类映射将传播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)
HTTP 请求配置回调。
如果需要将键值对象形式的信息从 Message
传递到请求,并且下游过滤器将访问这些属性以进行进一步处理,这将非常有用。