手动选择性启动 Kafka Streams 处理器

尽管上述方法会通过 StreamsBuilderFactoryManager 无条件地将 auto start 设置为 false 应用于应用程序中的所有 Kafka Streams 处理器,但通常需要只对个别选定的 Kafka Streams 处理器禁用自动启动。 例如,假设您的应用程序中有三个不同的函数(处理器),其中一个处理器不希望作为应用程序启动的一部分而启动。 以下是这种情况的一个示例。

@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {

}

@Bean
public Consumer<KStream<?, ?>> process2() {

}

@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {

}

在上述场景中,如果您将 spring.kafka.streams.auto-startup 设置为 false,那么在应用程序启动时,所有处理器都不会自动启动。 在这种情况下,您必须通过调用底层 StreamsBuilderFactoryManager 上的 start() 来以编程方式启动它们,如上所述。 然而,如果我们的用例是选择性地禁用一个处理器,那么您必须为该处理器在单独的绑定上设置 auto-startup。 假设我们不希望 process3 函数自动启动。 这是一个 BiFunction,带有两个输入绑定——process3-in-0process3-in-1。 为了避免此处理器自动启动,您可以选择其中任何一个输入绑定并在其上设置 auto-startup。 选择哪个绑定无关紧要;如果您愿意,可以同时将它们都设置为 auto-startupfalse,但一个就足够了。 因为它们共享同一个工厂 bean,所以您不必在两个绑定上都将 autoStartup 设置为 false,但为了清晰起见,这样做可能更有意义。

以下是您可以用来禁用此处理器自动启动的 Spring Cloud Stream 属性。

spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false

spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false

然后,您可以使用 REST 端点或使用 BindingsEndpoint API 手动启动处理器,如下所示。 为此,您需要确保类路径中包含 Spring Boot actuator 依赖项。

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0

@Autowired
BindingsEndpoint endpoint;

@Bean
public ApplicationRunner runner() {
    return args -> {
        endpoint.changeState("process3-in-0", State.STARTED);
    };
}

有关此机制的更多详细信息,请参阅参考文档中的 此部分

当按照本节所述通过禁用 auto-startup 来控制绑定时,请注意这仅适用于消费者绑定。 换句话说,如果您使用生产者绑定 process3-out-0,它在禁用处理器自动启动方面没有任何效果,尽管此生产者绑定使用与消费者绑定相同的 StreamsBuilderFactoryBean