Reactive Streams 支持

Spring Integration 在框架的某些地方以及从不同的方面提供了对 Reactive Streams 交互的支持。 我们将在此讨论其中的大部分内容,并在必要时提供指向目标章节的适当链接以获取详细信息。

前言

回顾一下,Spring Integration 扩展了 Spring 编程模型,以支持众所周知的企业集成模式。 Spring Integration 实现了基于 Spring 的应用程序中的轻量级消息传递,并通过声明式适配器支持与外部系统的集成。 Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。 这个目标在目标应用程序中通过 messagechannelendpoint 等一流公民来实现,它们允许我们构建一个集成流(管道),其中(在大多数情况下)一个端点将消息生产到通道中,供另一个端点消费。 通过这种方式,我们将集成交互模型与目标业务逻辑区分开来。 这里的关键部分是中间的通道:流的行为取决于其实现,而端点保持不变。

另一方面,Reactive Streams 是一个用于异步流处理并带有非阻塞背压的标准。 Reactive Streams 的主要目标是管理跨异步边界(例如将元素传递给另一个线程或线程池)的流数据交换,同时确保接收方不会被迫缓冲任意数量的数据。 换句话说,背压是该模型不可或缺的一部分,以允许在线程之间进行调解的队列有界。 Reactive Streams 实现(例如 Project Reactor)的意图是在流应用程序的整个处理图中保持这些好处和特性。 Reactive Streams 库的最终目标是以透明和流畅的方式为目标应用程序提供类型、操作符集和支持 API,这与可用的编程语言结构一样可能,但最终解决方案不像普通的函数链调用那样命令式。 它分为两个阶段:定义和执行,执行在稍后订阅最终的反应式发布者时发生,数据需求从定义的底部推送到顶部,并根据需要应用背压——我们请求当前可以处理的尽可能多的事件。 反应式应用程序看起来像一个“流”,或者正如我们在 Spring Integration 术语中习惯的——“流”。 事实上,Java 9 以来的 Reactive Streams SPI 在 java.util.concurrent.Flow 类中呈现。

从这里看,Spring Integration 流似乎非常适合编写 Reactive Streams 应用程序,当我们在端点上应用一些反应式框架操作符时,但实际上问题要广泛得多,我们需要记住并非所有端点(例如 JdbcMessageHandler)都可以透明地在反应式流中处理。 当然,Spring Integration 中对 Reactive Streams 的主要目标是允许整个过程完全反应式、按需启动并准备好背压。 在通道适配器的目标协议和系统提供 Reactive Streams 交互模型之前,这不可能实现。 在下面的部分中,我们将描述 Spring Integration 中为开发反应式应用程序而提供的组件和方法,同时保留集成流结构。

Spring Integration 中所有的 Reactive Streams 交互都使用 Project Reactor 类型实现,例如 MonoFlux

消息网关

与 Reactive Streams 交互的最简单点是 @MessagingGateway,我们只需将网关方法的返回类型设置为 Mono<?> —— 当在返回的 Mono 实例上发生订阅时,网关方法调用背后的整个集成流都将执行。 有关更多信息,请参阅 Reactor Mono。 类似的 Mono 响应方法在框架内部用于完全基于 Reactive Streams 兼容协议的入站网关(有关更多信息,请参阅下面的 reactive-channel-adapters)。 发送和接收操作被封装在 Mono.defer() 中,并在 replyChannel 头部可用时链式调用回复评估。 通过这种方式,特定反应式协议(例如 Netty)的入站组件将作为 Spring Integration 上执行的反应式流的订阅者和启动者。 如果请求负载是反应式类型,最好在反应式流定义中处理它,将处理推迟到启动者订阅。 为此,处理程序方法也必须返回反应式类型。 有关更多信息,请参阅下一节。

反应式回复负载

当生成回复的 MessageHandler 为回复消息返回反应式类型负载时,它会以异步方式进行处理,使用为 outputChannel 提供的常规 MessageChannel 实现(async 必须设置为 true),并在输出通道是 ReactiveStreamsSubscribableChannel 实现(例如 FluxMessageChannel)时通过按需订阅进行扁平化。 在标准的命令式 MessageChannel 用例中,如果回复负载是 多值 发布者(有关更多信息,请参阅 ReactiveAdapter.isMultiValue()),它将被封装在 Mono.just() 中。 结果是,Mono 必须在下游显式订阅或由 FluxMessageChannel 在下游扁平化。 对于 outputChannel 使用 ReactiveStreamsSubscribableChannel,无需担心返回类型和订阅;一切都由框架内部顺利处理。

有关更多信息,请参阅 异步服务激活器

另请参阅 Kotlin 协程 以获取更多信息。

FluxMessageChannelReactiveStreamsConsumer

FluxMessageChannelMessageChannelPublisher<Message<?>> 的组合实现。 一个 Flux 作为热源,内部创建用于从 send() 实现中接收传入消息。 Publisher.subscribe() 实现被委托给该内部 Flux。 此外,为了按需上游消费,FluxMessageChannel 提供了 ReactiveStreamsSubscribableChannel 契约的实现。 为该通道提供的任何上游 Publisher(例如,请参阅下面的源轮询通道适配器和拆分器)在订阅准备好时会自动订阅该通道。 来自此委托发布者的事件被接收到上面提到的内部 Flux 中。

FluxMessageChannel 的消费者必须是 org.reactivestreams.Subscriber 实例,以遵守 Reactive Streams 契约。 幸运的是,Spring Integration 中的所有 MessageHandler 实现也都实现了 Project Reactor 中的 CoreSubscriber。 由于中间的 ReactiveStreamsConsumer 实现,整个集成流配置对目标开发人员来说是透明的。 在这种情况下,流行为从命令式推模型变为反应式拉模型。 ReactiveStreamsConsumer 还可以用于使用 IntegrationReactiveUtils 将任何 MessageChannel 转换为反应式源,从而使集成流部分地反应式。

有关更多信息,请参阅 FluxMessageChannel

从 5.5 版本开始,ConsumerEndpointSpec 引入了一个 reactive() 选项,使流中的端点成为 ReactiveStreamsConsumer,而与输入通道无关。 可选的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> 可以通过 Flux.transform() 操作(例如 publishOn()doOnNext()retry() 等)来自定义输入通道的源 Flux。 此功能通过所有消息注解(@ServiceActivator@Splitter 等)的 reactive() 属性表示为 @Reactive 子注解。

源轮询通道适配器

通常,SourcePollingChannelAdapter 依赖于由 TaskScheduler 启动的任务。 轮询触发器是根据提供的选项构建的,用于定期调度任务以轮询目标数据或事件源。 当 outputChannelReactiveStreamsSubscribableChannel 时,相同的 Trigger 用于确定下一次执行时间,但 SourcePollingChannelAdapter 不是调度任务,而是基于 Flux.generate() 创建一个 Flux<Message<?>> 用于 nextExecutionTime 值和 Mono.delay() 用于从上一步开始的持续时间。 然后使用 Flux.flatMapMany() 轮询 maxMessagesPerPoll 并将其接收到输出 Flux 中。 此生成器 Flux 由提供的 ReactiveStreamsSubscribableChannel 订阅,以遵守下游的背压。 从 5.5 版本开始,当 maxMessagesPerPoll == 0 时,源根本不会被调用,并且 flatMapMany() 会通过 Mono.empty() 结果立即完成,直到 maxMessagesPerPoll 在稍后时间(例如通过控制总线)更改为非零值。 通过这种方式,任何 MessageSource 实现都可以转换为反应式热源。

有关更多信息,请参阅 轮询消费者

事件驱动通道适配器

MessageProducerSupport 是事件驱动通道适配器的基类,通常其 sendMessage(Message<?>) 用作生产驱动程序 API 中的侦听器回调。 当消息生产者实现构建消息 Flux 而不是基于侦听器的功能时,此回调也可以轻松地插入到 doOnNext() Reactor 操作符中。 事实上,当消息生产者的 outputChannel 不是 ReactiveStreamsSubscribableChannel 时,框架中就是这样做的。 然而,为了改善最终用户体验,并允许更多支持背压的功能,MessageProducerSupport 提供了 subscribeToPublisher(Publisher<? extends Message<?>>) API,当 Publisher<Message<?>>> 是来自目标系统的数据源时,可在目标实现中使用。 通常,它在 doStart() 实现中用于调用目标驱动程序 API 以获取源数据的 Publisher。 建议将反应式 MessageProducerSupport 实现与 FluxMessageChannel 作为 outputChannel 结合使用,以实现按需订阅和下游事件消费。 当对 Publisher 的订阅被取消时,通道适配器将进入停止状态。 对此类通道适配器调用 stop() 会完成从源 Publisher 的生产。 通道适配器可以重新启动,并自动订阅到新创建的源 Publisher

消息源到 Reactive Streams

从 5.3 版本开始,提供了 ReactiveMessageSourceProducer。 它是提供的 MessageSource 和事件驱动生产到配置的 outputChannel 的组合。 在内部,它将 MessageSource 包装到重复重新订阅的 Mono 中,生成一个 Flux<Message<?>>,以便订阅上述 subscribeToPublisher(Publisher<? extends Message<?>>)。 对此 Mono 的订阅使用 Schedulers.boundedElastic() 完成,以避免目标 MessageSource 中可能发生的阻塞。 当消息源返回 null(没有数据可拉取)时,Mono 将转换为 repeatWhenEmpty() 状态,并根据订阅者上下文中的 IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration 条目延迟后续重新订阅。 默认情况下,它是 1 秒。 如果 MessageSource 在消息头中生成带有 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 信息的 Message,则在原始 MonodoOnSuccess() 中确认(如果需要),如果下游流抛出带有失败消息的 MessagingException 以拒绝,则在 doOnError() 中拒绝。 此 ReactiveMessageSourceProducer 可用于任何需要将轮询通道适配器的功能转换为任何现有 MessageSource<?> 实现的反应式按需解决方案的用例。

拆分器和聚合器

AbstractMessageSplitter 获得一个 Publisher 用于其逻辑时,过程自然地遍历 Publisher 中的项目,将它们映射为消息以发送到 outputChannel。 如果此通道是 ReactiveStreamsSubscribableChannel,则 PublisherFlux 包装器会根据该通道的需求订阅,并且此拆分器行为更像 flatMap Reactor 操作符,我们将传入事件映射为多值输出 Publisher。 当整个集成流在拆分器之前和之后都用 FluxMessageChannel 构建时,使其与 Reactive Streams 要求及其用于事件处理的操作符对齐时,这最有意义。 对于常规通道,Publisher 会转换为 Iterable,用于标准的迭代和生产拆分逻辑。

FluxAggregatorMessageHandler 是特定 Reactive Streams 逻辑实现的另一个示例,可以将其视为 Project Reactor 中的“反应式操作符”。 它基于 Flux.groupBy()Flux.window()(或 buffer())操作符。 当创建 FluxAggregatorMessageHandler 时,传入消息会流入 Flux.create(),使其成为热源。 此 Flux 会根据需求由 ReactiveStreamsSubscribableChannel 订阅,或者当 outputChannel 不是反应式时,直接在 FluxAggregatorMessageHandler.start() 中订阅。 当整个集成流在此组件之前和之后都用 FluxMessageChannel 构建时,此 MessageHandler 具有其强大功能,使整个逻辑具备背压能力。

有关更多信息,请参阅 流和 Flux 拆分Flux 聚合器

Java DSL

Java DSL 中的 IntegrationFlow 可以从任何 Publisher 实例开始(请参阅 IntegrationFlow.from(Publisher<Message<T>>))。 此外,通过 IntegrationFlowBuilder.toReactivePublisher() 操作符,IntegrationFlow 可以转换为反应式热源。 在两种情况下,FluxMessageChannel 都在内部使用;它可以根据其 ReactiveStreamsSubscribableChannel 契约订阅入站 Publisher,并且它本身就是下游订阅者的 Publisher<Message<?>>。 通过动态 IntegrationFlow 注册,我们可以实现强大的逻辑,将 Reactive Streams 与此集成流结合起来,桥接到/来自 Publisher

从 5.5.6 版本开始,存在 toReactivePublisher(boolean autoStartOnSubscribe) 操作符变体,用于控制返回的 Publisher<Message<?>> 背后的整个 IntegrationFlow 的生命周期。 通常,对反应式发布者的订阅和消费发生在后期的运行时阶段,而不是在反应式流组合期间,甚至不是在 ApplicationContext 启动期间。 为了避免在 Publisher<Message<?>> 订阅点进行 IntegrationFlow 生命周期管理的样板代码并提供更好的最终用户体验,引入了此带有 autoStartOnSubscribe 标志的新操作符。 它将 IntegrationFlow 及其组件标记为 autoStartup = false(如果为 true),因此 ApplicationContext 不会自动启动流中的消息生产和消费。 相反,IntegrationFlowstart() 是从内部 Flux.doOnSubscribe() 启动的。 无论 autoStartOnSubscribe 的值如何,流都会从 Flux.doOnCancel()Flux.doOnTerminate() 停止——如果没有任何东西可以消费消息,那么生产消息就没有意义。

对于完全相反的用例,当 IntegrationFlow 应该调用反应式流并在完成后继续时,IntegrationFlowDefinition 中提供了 fluxTransform() 操作符。 此时的流被转换为 FluxMessageChannel,它被传播到提供的 fluxFunction 中,并在 Flux.transform() 操作符中执行。 函数的结果被封装到 Mono<Message<?>> 中,以便扁平化到输出 Flux 中,该输出 Flux 由另一个 FluxMessageChannel 订阅以进行下游流。

有关更多信息,请参阅 Java DSL 章

ReactiveMessageHandler

从 5.3 版本开始,框架原生支持 ReactiveMessageHandler。 这种类型的消息处理程序专为反应式客户端设计,这些客户端返回反应式类型以进行低级操作执行的按需订阅,并且不提供任何回复数据以继续反应式流组合。 当 ReactiveMessageHandler 在命令式集成流中使用时,handleMessage() 结果在返回后立即订阅,因为此类流中没有反应式流组合来遵守背压。 在这种情况下,框架将此 ReactiveMessageHandler 封装到 ReactiveMessageHandlerAdapter 中——一个 MessageHandler 的简单实现。 然而,当 ReactiveStreamsConsumer 参与到流中时(例如,当要消费的通道是 FluxMessageChannel 时),此类 ReactiveMessageHandler 会与整个反应式流通过 flatMap() Reactor 操作符组合,以在消费期间遵守背压。

开箱即用的 ReactiveMessageHandler 实现之一是用于出站通道适配器的 ReactiveMongoDbStoringMessageHandler。 有关更多信息,请参阅 MongoDB 反应式通道适配器

从 6.1 版本开始,IntegrationFlowDefinition 暴露了一个方便的 handleReactive(ReactiveMessageHandler) 终端操作符。 任何 ReactiveMessageHandler 实现(甚至只是使用 Mono API 的简单 lambda)都可以用于此操作符。 框架会自动订阅返回的 Mono<Void>。 以下是此操作符可能配置的简单示例:

@Bean
public IntegrationFlow wireTapFlow1() {
    return IntegrationFlow.from("tappedChannel1")
            .wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
            .handleReactive((message) -> Mono.just(message).log().then());
}

此操作符的重载版本接受 Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> 以自定义围绕提供的 ReactiveMessageHandler 的消费者端点。

此外,还提供了基于 ReactiveMessageHandlerSpec 的变体。 在大多数情况下,它们用于特定于协议的通道适配器实现。 请参阅下一节,其中包含指向具有相应反应式通道适配器的目标技术的链接。

反应式通道适配器

当集成目标协议提供 Reactive Streams 解决方案时,在 Spring Integration 中实现通道适配器变得非常简单。

入站事件驱动通道适配器实现是将请求(如果需要)包装到延迟的 MonoFlux 中,并且仅当协议组件启动对侦听器方法返回的 Mono 的订阅时才执行发送(并生成回复,如果有)。 通过这种方式,我们将反应式流解决方案精确地封装在此组件中。 当然,订阅输出通道的下游集成流应遵守 Reactive Streams 规范,并以按需、支持背压的方式执行。

这并非总是可用的,因为集成流中使用的 MessageHandler 处理器本质上(或当前实现)存在局限性。 当没有反应式实现时,可以使用线程池和队列或 FluxMessageChannel(见上文)在集成端点之前和之后处理此限制。

反应式 事件驱动 入站通道适配器示例:

public class CustomReactiveMessageProducer extends MessageProducerSupport {

    private final CustomReactiveSource customReactiveSource;

    public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
        Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
        this.customReactiveSource = customReactiveSource;
    }

    @Override
    protected void doStart() {
        Flux<Message<?>> messageFlux =
            this.customReactiveSource
                .map(event - >
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());

        subscribeToPublisher(messageFlux);
    }
}

用法如下:

public class MainFlow {
  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .channel(outputChannel)
        .get();
  }
}

或者以声明方式:

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
        .handle(outputChannel)
        .get();
  }
}

或者甚至没有通道适配器,我们总是可以使用 Java DSL,如下所示:

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
    Flux<Message<?>> myFlux = this.customReactiveSource
                .map(event ->
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());
     return IntegrationFlow.from(myFlux)
        .handle(outputChannel)
        .get();
  }
}

反应式出站通道适配器实现是关于启动(或继续)一个反应式流,以根据目标协议提供的反应式 API 与外部系统交互。 入站负载本身可以是反应式类型,也可以是作为整个集成流的事件,该流是顶层反应式流的一部分。 如果我们在单向、即发即弃的场景中,返回的反应式类型可以立即订阅,或者它会传播到下游(请求-回复场景)以进行进一步的集成流或目标业务逻辑中的显式订阅,但仍然保留反应式流语义。

反应式出站通道适配器示例:

public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

    private final CustomEntityOperations customEntityOperations;

    public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
        Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
        this.customEntityOperations = customEntityOperations;
    }

    @Override
    protected Mono<Void> handleMessageInternal(Message<?> message) {
        return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
                .flatMap(mode -> {
                    switch (mode) {
                        case INSERT:
                            return handleInsert(message);
                        case UPDATE:
                            return handleUpdate(message);
                        default:
                            return Mono.error(new IllegalArgumentException());
                    }
                }).then();
    }

    private Mono<Void> handleInsert(Message<?> message) {
        return this.customEntityOperations.insert(message.getPayload())
                .then();
    }

    private Mono<Void> handleUpdate(Message<?> message) {
        return this.r2dbcEntityOperations.update(message.getPayload())
                .then();
    }

    public enum Type {
        INSERT,
        UPDATE,
    }
}

我们将能够使用这两个通道适配器:

public class MainFlow {

  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Autowired
  private CustomReactiveMessageHandler customReactiveMessageHandler;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .transform(someOperation)
        .handle(customReactiveMessageHandler)
        .get();
  }
}

目前,Spring Integration 提供了 WebFluxRSocketMongoDbR2DBCZeroMQGraphQLApache Cassandra 的通道适配器(或网关)实现。 Redis 流通道适配器 也是反应式的,并使用 Spring Data 中的 ReactiveStreamOperations。 更多的反应式通道适配器正在开发中,例如基于 Spring for Apache KafkaReactiveKafkaProducerTemplateReactiveKafkaConsumerTemplateApache Kafka 等。 对于许多其他非反应式通道适配器,建议使用线程池以避免在反应式流处理期间阻塞。

反应式到命令式上下文传播

Context Propagation 库在类路径上时,Project Reactor 可以获取 ThreadLocal 值(例如 Micrometer ObservationSecurityContextHolder])并将它们存储到 `Subscriber 上下文中。 相反的操作也是可能的,当我们为了跟踪需要填充日志 MDC 或让从反应式流调用的服务从作用域恢复观察时。 有关上下文传播的更多信息,请参阅 Project Reactor 的 文档 中关于其特殊操作符的说明。 如果我们的整个解决方案是单个反应式流组合,则存储和恢复上下文可以顺利进行,因为 Subscriber 上下文从下游一直到组合的开始(FluxMono)都可见。 但是,如果应用程序在不同的 Flux 实例之间切换或切换到命令式处理再返回,则与 Subscriber 绑定的上下文可能不可用。 对于此类用例,Spring Integration 提供了附加功能(从 6.0.5 版本开始),将 Reactor ContextView 存储到从反应式流生成的 IntegrationMessageHeaderAccessor.REACTOR_CONTEXT 消息头中,例如当我们执行直接 send() 操作时。 然后,此头在 FluxMessageChannel.subscribeTo() 中用于恢复此通道将发出的 Message 的 Reactor 上下文。 目前,此头由 WebFluxInboundEndpointRSocketInboundGateway 组件填充,但可用于执行反应式到命令式集成的任何解决方案。 填充此头的逻辑如下:

return requestMono
        .flatMap((message) ->
                Mono.deferContextual((context) ->
                        Mono.just(message)
                                .handle((messageToSend, sink) ->
                                        send(messageWithReactorContextIfAny(messageToSend, context)))));
...

private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
    if (!context.isEmpty()) {
        return getMessageBuilderFactory()
                .fromMessage(message)
                .setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
                .build();
    }
    return message;
}

请注意,我们仍然需要使用 handle() 操作符来使 Reactor 从上下文中恢复 ThreadLocal 值。 即使它作为消息头发送,框架也无法假定它是否会在下游恢复到 ThreadLocal 值。

要在另一个 FluxMono 组合中从 Message 恢复上下文,可以执行以下逻辑:

Mono.just(message)
        .handle((messageToHandle, sink) -> ...)
        .contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));