使用 Reactive Kafka 绑定器的基本示例
在本节中,我们将展示一些使用 reactive 绑定器编写 reactive Kafka 应用程序的基本代码片段及其详细信息。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
您可以将上述 uppercase
函数与基于消息通道的 Kafka 绑定器 (spring-cloud-stream-binder-kafka
) 和本节讨论的 reactive Kafka 绑定器 (spring-cloud-stream-binder-kafka-reactive
) 一起使用。
当将此函数与常规 Kafka 绑定器一起使用时,尽管您在应用程序中(即在 uppercase
函数中)使用了 reactive 类型,但您只在函数执行期间获得了 reactive 流。
在函数执行上下文之外,没有 reactive 优势,因为底层绑定器不是基于 reactive 堆栈的。
因此,尽管这看起来像是带来了完整的端到端 reactive 堆栈,但此应用程序只是部分 reactive 的。
现在假设您正在使用适用于 Kafka 的正确 reactive 绑定器 - spring-cloud-stream-binder-kafka-reactive
,并结合上述函数应用程序。
此绑定器实现将从链条的顶端消费到底端发布,提供完整的 reactive 优势。
这是因为底层绑定器是建立在 Reactor Kafka 的核心 API 之上的。
在消费者端,它使用了 KafkaReceiver,这是一个 Kafka 消费者的 reactive 实现。
类似地,在生产者端,它使用了 KafkaSender API,这是一个 Kafka 生产者的 reactive 实现。
由于 reactive Kafka 绑定器的基础是建立在适当的 reactive Kafka API 之上的,因此应用程序可以充分利用 reactive 技术。
当使用此 reactive Kafka 绑定器时,诸如自动背压等 reactive 功能都内置于应用程序中。
从 4.0.2 版本开始,您可以通过分别提供一个或多个 ReceiverOptionsCustomizer
或 SenderOptionsCustomizer
bean 来定制 ReceiverOptions
和 SenderOptions
。
它们是 BiFunction
,接收绑定名称和初始选项,并返回定制的选项。
接口扩展了 Ordered
,因此当存在多个定制器时,它们将按照所需的顺序应用。
绑定器默认不提交偏移量。
从 4.0.2 版本开始,KafkaHeaders.ACKNOWLEDGMENT
头包含一个 ReceiverOffset
对象,您可以通过调用其 acknowledge()
或 commit()
方法来提交偏移量。
@Bean
public Consumer<Flux<Message<String>>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
有关更多信息,请参阅 reactor-kafka
文档和 javadoc。
此外,从 4.0.3 版本开始,可以将 Kafka 消费者属性 reactiveAtmostOnce
设置为 true
,绑定器将在处理每个 poll 返回的记录之前自动提交偏移量。
另外,从 4.0.3 版本开始,您可以将消费者属性 reactiveAutoCommit
设置为 true
,绑定器将在处理每个 poll 返回的记录之后自动提交偏移量。
在这些情况下,不存在确认头。
4.0.2 也提供了 reactiveAutoCommit
,但实现不正确,它的行为类似于 reactiveAtMostOnce
。
以下是使用 reactiveAutoCommit
的示例。
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
请注意,当使用自动提交时,reactor-kafka
返回 Flux<Flux<ConsumerRecord<?, ?>>>
。
鉴于 Spring 无法访问内部 flux 的内容,应用程序必须处理原生的 ConsumerRecord
;不会对内容应用消息转换或转换服务。
这需要使用原生解码(通过在配置中指定适当类型的 Deserializer
)来返回所需类型的记录键/值。