Basic Example using the Reactive Kafka Binder
在本节中,我们展示了一些使用反应式 binder 编写反应式 Kafka 应用程序的基本代码片段及其详细信息。
In this section, we show some basic code snippets for writing a reactive Kafka application using the reactive binder and details around them.
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
你可以将上述 upppercase
函数与基于消息通道的 Kafka binder (spring-cloud-stream-binder-kafka
) 以及此部分讨论的主题反应式 Kafka binder (spring-cloud-stream-binder-kafka-reactive
) 一起使用。与常规 Kafka binder 一起使用此函数时,即使你在应用程序中使用反应式类型(即在 uppercase
函数中),你只在函数执行时获得反应式流。在函数的执行上下文之外,没有反应式优点,因为底层 binder 不是基于反应式堆栈的。因此,虽然这可能看起来像是在提供一个完整的端到端反应式堆栈,但此应用程序只是部分反应式的。
You can use the above upppercase
function with both message channel based Kafka binder (spring-cloud-stream-binder-kafka
) as well as the reactive Kafka binder (spring-cloud-stream-binder-kafka-reactive
), the topic of discussion in this section.
When using this function with the regular Kafka binder, although you are using reactive types in the application (i.e., in the uppercase
function), you only get the reactive streams within the execution of your function.
Outside the function’s execution context, there is no reactive benefits since the underlying binder is not based on the reactive stack.
Therefore, although this might look like it is bringing a full end-to-end reactive stack, this application is only partially reactive.
现在假设您正在将合适的 Kafka 反应性粘合剂 - spring-cloud-stream-binder-kafka-reactive
用在上述函数的应用中。此粘合剂实现将发挥全面的反应性优势,从顶端的消费到链条底端的发布。这是因为底层粘合剂建立在 Reactor Kafka 的核心 API 之上。在使用者端,它利用 KafkaReceiver,这是 Kafka 使用者的一个反应实现。类似地,在生产者端,它使用 KafkaSender API,这是 Kafka 生产者的反应实现。由于反应性 Kafka 粘合剂的基础是建立在合适的反应性 Kafka API 之上,因此应用可以充分利用反应式技术。在使用此反应性 Kafka 粘合剂时,自动反压等其他反应能力已为应用程序内建。
Now assume that you are using the proper reactive binder for Kafka - spring-cloud-stream-binder-kafka-reactive
with the above function’s application.
This binder implementation will give the full reactive benefits all the way from consumption on the top end to publishing at the bottom end of the chain.
This is because the underlying binder is built on top of Reactor Kafka's core API’s.
On the consumer side, it makes use of the KafkaReceiver which is a reactive implementation of a Kafka consumer.
Similarly, on the producer side, it uses KafkaSender API which is the reactive implementation of a Kafka producer.
Since the foundations of the reactive Kafka binder is built upon a proper reactive Kafka API, applications get the full benefits of using reactive technologies.
Things like automatic back pressure, among other reactive capabilities, are built-in for the application when using this reactive Kafka binder.
从版本 4.0.2 开始,你可以分别通过提供一个或多个 ReceiverOptionsCustomizer
或 SenderOptionsCustomizer
bean 来定制 ReceiverOptions
和 SenderOptions
。它们是 BiFunction
,接收绑定名称和初始选项,返回定制的选项。这些接口扩展了 Ordered
,因此当存在多个定制器时,将按所需的顺序应用这些定制器。
Starting with version 4.0.2, you can customize the ReceiverOptions
and SenderOptions
by providing one or more ReceiverOptionsCustomizer
or SenderOptionsCustomizer
beans respectively.
They are BiFunction
s which receive the binding name and initial options, returning the customized options.
The interfaces extend Ordered
so the customizers will be applied in the order required, when more than one are present.
默认情况下,黏合剂不会提交偏移。从 4.0.2 版开始,KafkaHeaders.ACKNOWLEDGMENT
头包含一个 ReceiverOffset
对象,使您可以通过调用其 acknowledge()
或 commit()
方法来提交偏移。
The binder does not commit offsets by default.
Starting with version 4.0.2, the KafkaHeaders.ACKNOWLEDGMENT
header contains a ReceiverOffset
object which allows you to cause the offset to be committed by calling its acknowledge()
or commit()
methods.
@Bean
public Consumer<Flux<Message<String>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
有关更多信息,请参阅 reactor-kafka
文档和 javadoc。
Refer to the reactor-kafka
documentation and javadocs for more information.
此外,从版本 4.0.3 开始,可以将 Kafka consumer 属性 reactiveAtmostOnce
设置为 true
,绑定程序将在处理每次投票返回的记录之前自动提交偏移量。此外,从版本 4.0.3 开始,可以将 consumer 属性 reactiveAutoCommit
设置为 true
,绑定程序将在处理每次投票返回的记录之后自动提交偏移量。在这些情况下,不会出现确认标头。
In addition, starting with version 4.0.3, the Kafka consumer property reactiveAtmostOnce
can be set to true
and the binder will automatically commit the offsets before records returned by each poll are processed.
Also, starting with version 4.0.3, you can set the consumer property reactiveAutoCommit
to true
and the the binder will automatically commit the offsets after the records returned by each poll are processed.
In these cases, the acknowledgment header is not present.
4.0.2 还提供了 reactiveAutoCommit
,但实现有误,其行为类似于 reactiveAtMostOnce
。
4.0.2 also provided reactiveAutoCommit
, but the implementation was incorrect, it behaved similarly to reactiveAtMostOnce
.
以下示例演示如何使用 reaciveAutoCommit
。
The following is an example of how to use reaciveAutoCommit
.
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
请注意,在使用自动提交时,reactor-kafka
会返回一个 Flux<Flux<ConsumerRecord<?, ?>>>
。由于 Spring 无法访问内部 flux 的内容,因此应用程序必须处理本机 ConsumerRecord
;没有消息转换或转换服务应用于内容。这需要使用本机解码(通过在配置中指定适当类型的 Deserializer
)才能返回所需类型的记录密钥/值。
Note that reactor-kafka
returns a Flux<Flux<ConsumerRecord<?, ?>>>
when using auto commit.
Given that Spring has no access to the contents of the inner flux, the application must deal with the native ConsumerRecord
; there is no message conversion or conversion service applied to the contents.
This requires the use of native decoding (by specifying a Deserializer
of the appropriate type in the configuration) to return record keys/values of the desired types.