Mixing high level DSL and low level Processor API
Kafka Streams 提供两个 API 变体。它有一个高级 DSL 类似的 API,你可以在其中链接许多操作,这些操作可能对许多函数式程序员很熟悉。Kafka Streams 还可以访问低级处理器 API。处理器 API 功能非常强大,可以让你以更低级别的方式控制事物,但本质上是命令式的。适用于 Spring Cloud Stream 的 Kafka Streams 绑定器允许你使用高级 DSL 或同时混合使用 DSL 和处理器 API。同时混合使用这些变体,可以提供许多选项来控制应用程序中的各种用例。应用程序可以使用 transform
或 process
方法的 API 调用来访问处理器 API。
Kafka Streams provides two variants of APIs.
It has a higher level DSL like API where you can chain various operations that maybe familiar to a lot of functional programmers.
Kafka Streams also gives access to a low level Processor API.
The processor API, although very powerful and gives the ability to control things in a much lower level, is imperative in nature.
Kafka Streams binder for Spring Cloud Stream, allows you to use either the high level DSL or mixing both the DSL and the processor API.
Mixing both of these variants give you a lot of options to control various use cases in an application.
Applications can use the transform
or process
method API calls to get access to the processor API.
以下是使用 process
API 在 Spring Cloud Stream 应用程序中组合 DSL 和处理器 API 的示例。
Here is a look at how one may combine both the DSL and the processor API in a Spring Cloud Stream application using the process
API.
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
以下是使用 transform
API 的示例。
Here is an example using the transform
API.
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
process
API 方法调用是一个终止操作,而 transform
API 是非终止的,并使用你可以使用 DSL 或处理器 API 继续进一步处理的潜在转换的 KStream
。
The process
API method call is a terminal operation while the transform
API is non terminal and gives you a potentially transformed KStream
using which you can continue further processing using either the DSL or the processor API.