Connecting to Kafka
-
KafkaAdmin——请参见 Configuring Topics -
ProducerFactory——请参见 Sending Messages -
ConsumerFactory——请参见 Receiving Messages
从 2.5 版本开始,每个版本都扩展了 KafkaResourceFactory。这允许在运行时通过向其配置添加 Supplier<String> 来更改引导服务器: setBootstrapServersSupplier(() → …)。这将针对所有新连接调用以获取服务器列表。使用者和生产者通常是长期存在的。若要关闭现有的生产者,请在 DefaultKafkaProducerFactory 上调用 reset()。若要关闭现有的使用者,请在 KafkaListenerEndpointRegistry 上调用 stop()(然后调用 start()),并/或在其他侦听器容器 bean 上调用 stop() 和 start()。
为方便起见,该框架还提供了一个 ABSwitchCluster,它支持两组引导服务器;其中一组在任何时间点都是处于活动状态。配置 ABSwitchCluster 并将其添加到生产者和使用者工厂,以及 KafkaAdmin,通过调用 setBootstrapServersSupplier()。当您想要切换时,请调用 primary() 或 secondary(),并在生产者工厂上调用 reset() 以建立新连接;对于使用者,请 stop() 并 start() 所有侦听器容器。当使用 @KafkaListener 时,请 stop() 并 start() KafkaListenerEndpointRegistry bean。
有关更多信息,请参见 Javadoc。
Factory Listeners
从 2.5 版本开始,可以为 DefaultKafkaProducerFactory 和 DefaultKafkaConsumerFactory 配置一个 Listener,以便在创建或关闭生产者或使用者时接收通知。
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,id 都通过将 client-id 属性(从创建之后的 metrics() 获得)追加到工厂 beanName 属性(用“.”分隔)来创建。
例如,这些侦听器可用于在创建新客户端时创建一个 Micrometer KafkaClientMetrics 实例并绑定该实例(并在客户端关闭时关闭该实例)。
框架提供确切做到此的侦听器;请参见 Micrometer Native Metrics。
Default client ID prefixes
从 3.2 版本开始,对于使用 spring.application.name 属性定义应用程序名称的 Spring Boot 应用程序,此名称现在用作以下客户端类型的自动生成客户端 ID 的默认前缀:
-
未使用使用者组的使用者客户端
-
producer clients
-
admin clients
这使得在服务器端更轻松地识别这些客户端,以便进行故障排除或应用配额。
| Client Type | Without application name | With application name |
|---|---|---|
consumer without consumer group |
consumer-null-1 |
myapp-consumer-1 |
使用消费者组“mygroup”的消费者 |
consumer-mygroup-1 |
consumer-mygroup-1 |
producer |
producer-1 |
myapp-producer-1 |
admin |
adminclient-1 |
myapp-admin-1 |