StreamsBuilderFactoryBean configurer

通常需要自定义创建 KafkaStreams 对象的 StreamsBuilderFactoryBean。基于 Spring Kafka 提供的基础支持,粘合剂允许你自定义 StreamsBuilderFactoryBean。你可以使用 StreamsBuilderFactoryBeanConfigurer 来自定义 StreamsBuilderFactoryBean 本身。然后,一旦你通过此配置程序访问了 StreamsBuilderFactoryBean,你就可以使用 KafkaStreamsCustomzier 来自定义相应的 KafkaStreams。这两个自定义程序都是 Spring for Apache Kafka 项目的一部分。

It is often required to customize the StreamsBuilderFactoryBean that creates the KafkaStreams objects. Based on the underlying support provided by Spring Kafka, the binder allows you to customize the StreamsBuilderFactoryBean. You can use the StreamsBuilderFactoryBeanConfigurer to customize the StreamsBuilderFactoryBean itself. Then, once you get access to the StreamsBuilderFactoryBean through this configurer, you can customize the corresponding KafkaStreams using KafkaStreamsCustomzier. Both of these customizers are part of the Spring for Apache Kafka project.

这里有一个使用 StreamsBuilderFactoryBeanConfigurer 的示例。

Here is an example of using the StreamsBuilderFactoryBeanConfigurer.

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

上述内容展示了你可以执行的自定义 StreamsBuilderFactoryBean 操作。你可以调用 StreamsBuilderFactoryBean 中的任何可用的突变操作来自定义它。该自定义程序将在工厂 bean 启动之前由粘合剂进行调用。

The above is shown as an illustration of the things you can do to customize the StreamsBuilderFactoryBean. You can essentially call any available mutation operations from StreamsBuilderFactoryBean to customize it. This customizer will be invoked by the binder right before the factory bean is started.

一旦你访问了 StreamsBuilderFactoryBean,你还可以自定义底层的 KafkaStreams 对象。以下是可以这么做的蓝图。

Once you get access to the StreamsBuilderFactoryBean, you can also customize the underlying KafkaStreams object. Here is a blueprint for doing so.

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer 将在底层的 KafkaStreams 启动之前由 StreamsBuilderFactoryBeabn 进行调用。

KafkaStreamsCustomizer will be called by the StreamsBuilderFactoryBeabn right before the underlying KafkaStreams gets started.

在整个应用程序中只能有一个 StreamsBuilderFactoryBeanConfigurer。那么我们如何解释多个 Kafka Streams 处理器,因为每个处理器都由单独的 StreamsBuilderFactoryBean 对象提供支持?在这种情况下,如果那些处理器的定制需要有所不同,那么应用程序需要基于应用程序 ID 应用一些过滤器。

There can only be one StreamsBuilderFactoryBeanConfigurer in the entire application. Then how do we account for multiple Kafka Streams processors as each of them are backed up by individual StreamsBuilderFactoryBean objects? In that case, if the customization needs to be different for those processors, then the application needs to apply some filter based on the application ID.

例如:

For e.g,

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

Using StreamsBuilderFactoryBeanConfigurer to register a global state store

如上所述,粘合剂不提供一种一流的方式来注册全局状态商店作为一项功能。为此,你需要使用自定义程序。以下是如何做到这一点。

As mentioned above, the binder does not provide a first class way to register global state stores as a feature. For that, you need to use the customizer. Here is how that can be done.

@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
    return fb -> {
        try {
            final StreamsBuilder streamsBuilder = fb.getObject();
            streamsBuilder.addGlobalStore(...);
        }
        catch (Exception e) {

        }
    };
}

同样,如果你有多个处理器,你希望通过使用应用程序 ID 过滤掉其他 StreamsBuilderFactoryBean 对象将全局状态存储区附加到正确的 StreamsBuilder

Again, if you have multiple processors, you want to attach the global state store to the right StreamsBuilder by filtering out the other StreamsBuilderFactoryBean objects using the application id as outlined above.

Using StreamsBuilderFactoryBeanConfigurer to register a production exception handler

在错误处理部分,我们指出粘合剂不提供一流的方式来处理生产异常。尽管事实如此,你仍然可以使用 StreamsBuilderFacotryBean 自定义程序来注册生产异常处理程序。见下文。

In the error handling section, we indicated that the binder does not provide a first class way to deal with production exceptions. Though that is the case, you can still use the StreamsBuilderFacotryBean customizer to register production exception handlers. See below.

@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

再次声明,如果你有多个处理器,你可能希望针对正确的 StreamsBuilderFactoryBean 来适当设置它。你还可以使用配置属性添加这样的生产异常处理程序(有关该内容的更多信息,请参见下文),但如果你选择采用编程方法,这是一个不错的选择。

Once again, if you have multiple processors, you may want to set it appropriately against the correct StreamsBuilderFactoryBean. You may also add such production exception handlers using the configuration property (See below for more on that), but this is an option if you choose to go with a programmatic approach.