手动启动 Kafka Streams 处理器
Spring Cloud Stream Kafka Streams 绑定器在 Spring for Apache Kafka 的 StreamsBuilderFactoryBean
之上提供了一个名为 StreamsBuilderFactoryManager
的抽象。
这个管理器 API 用于控制绑定器应用程序中每个处理器对应的多个 StreamsBuilderFactoryBean
。
因此,当使用绑定器时,如果您想手动控制应用程序中各种 StreamsBuilderFactoryBean
对象的自动启动,您需要使用 StreamsBuilderFactoryManager
。
您可以使用属性 spring.kafka.streams.auto-startup
并将其设置为 false
来关闭处理器的自动启动。
然后,在应用程序中,您可以使用如下所示的代码来使用 StreamsBuilderFactoryManager
启动处理器。
@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
return args -> {
sbfm.start();
};
}
当您希望应用程序在主线程中启动,并让 Kafka Streams 处理器单独启动时,此功能非常方便。
例如,当您有一个需要恢复的大型状态存储时,如果处理器像默认情况一样正常启动,这可能会阻止您的应用程序启动。
如果您正在使用某种活跃度探针机制(例如在 Kubernetes 上),它可能会认为应用程序已关闭并尝试重新启动。
为了纠正这种情况,您可以将 spring.kafka.streams.auto-startup
设置为 false
并遵循上述方法。
请记住,当使用 Spring Cloud Stream 绑定器时,您不是直接处理 Spring for Apache Kafka 的 StreamsBuilderFactoryBean
,而是 StreamsBuilderFactoryManager
,因为 StreamsBuilderFactoryBean
对象由绑定器内部管理。