Monitoring
Monitoring Listener Performance
从 2.3 版开始,如果在类路径上检测到 Micrometer,并且应用程序上下文中存在单个 MeterRegistry,则侦听器容器将为侦听器自动创建和更新 Micrometer Timer。可以通过将 ContainerProperty 的 micrometerEnabled 设置为 false 来禁用这些计时器。
始终保留两个计时器 - 一个用于对侦听器的成功调用,另一个用于失败。
这些计时器的名称为 spring.kafka.listener,并带有以下标记:
-
name: (容器 bean 名称) -
result:success或failure -
exception:none或ListenerExecutionFailedException
您可以使用 ContainerProperties 的 micrometerTags 属性添加其他标记。
从 2.9.8、3.0.6 版开始,您可以在 ContainerProperties 的 micrometerTagsProvider 中提供一个函数;该函数接收 ConsumerRecord<?, ?>,并返回可以基于该记录的标记,并与 micrometerTags 中的任何静态标记合并。
|
对于并发容器,将为每个线程创建计时器, |
Monitoring KafkaTemplate Performance
从 2.5 版开始,如果在类路径上检测到 Micrometer,并且应用程序上下文中存在单个 MeterRegistry,则模板将为发送操作自动创建和更新 Micrometer Timer。可以通过将模板的 micrometerEnabled 属性设置为 false 来禁用这些计时器。
始终保留两个计时器 - 一个用于对侦听器的成功调用,另一个用于失败。
这些计时器的名称为 spring.kafka.template,并带有以下标记:
-
name: (模板 bean 名称) -
result:success或failure -
exception: `none`或失败的异常类名称
您可以使用模板的 micrometerTags 属性添加其他标记。
从 2.9.8、3.0.6 版开始,您可以提供一个 KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>) 属性;该函数接收 ProducerRecord<?, ?>,并返回可以基于该记录的标记,并与 micrometerTags 中的任何静态标记合并。
Micrometer Native Metrics
从版本 2.5 开始,框架提供 Factory Listeners 来管理 Micrometer KafkaClientMetrics 实例,无论何时创建和关闭生产者和使用者。
要启用此功能,只需将监听器添加到您的生产者和消费者工厂:
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
传递给监听器的消费者/生产者 id 会以标签名 spring.id 添加到度量的标签中。
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count();
为 StreamsBuilderFactoryBean 提供了类似的侦听器 - 请参阅 KafkaStreams Micrometer Support。
Micrometer Observation
从版本 3.0 开始,现在支持对 KafkaTemplate 和监听器容器使用 Micrometer 进行观察。
在 KafkaTemplate 上将 observationEnabled 设置为 true,然后将 ContainerProperties 设置为 Micrometer Timers 以启用观察;这将禁用 Micrometer Timers,因为计时器现在将随每次观察而管理。
Micrometer Observation 不支持批处理侦听器;这将启用 Micrometer 计时器
有关更多信息,请参阅 Micrometer Tracing。
要向计时器/跟踪添加标签,请分别将自定义 KafkaTemplateObservationConvention 或 KafkaListenerObservationConvention 配置到模板或监听器容器中。
默认实现为模板观察添加 bean.name 标签,为容器添加 listener.id 标签。
您可以对 DefaultKafkaTemplateObservationConvention 或 DefaultKafkaListenerObservationConvention 进行子类化,也可以提供全新的实现。
有关记录的默认观察事项的详细信息,请参阅 Micrometer Observation Documentation。
从版本 3.0.6 开始,您可以根据消费者或生产者记录中的信息,向计时器和跟踪添加动态标签。为此,向监听器容器属性或 KafkaTemplate 添加自定义 KafkaListenerObservationConvention 和/或 KafkaTemplateObservationConvention。两个观察上下文中的 record 属性分别包含 ConsumerRecord 或 ProducerRecord。
发送方和接收方上下文 remoteServiceName 属性已设置成 Kafka clusterId 属性;这是由 KafkaAdmin 检索的。如果由于某种原因(或许是缺乏管理员权限),您无法检索集群 id,从版本 3.1 开始,您可以在 KafkaAdmin 上设置一个手动 clusterId,并将其注入到 KafkaTemplate 和监听器容器。当它为 null(默认设置)时,管理员将调用 describeCluster 管理操作从代理中检索该设置。