Monitoring

Monitoring Listener Performance

从 2.3 版开始,如果在类路径上检测到 Micrometer,并且应用程序上下文中存在单个 MeterRegistry,则侦听器容器将为侦听器自动创建和更新 Micrometer Timer。可以通过将 ContainerPropertymicrometerEnabled 设置为 false 来禁用这些计时器。

Starting with version 2.3, the listener container will automatically create and update Micrometer Timer`s for the listener, if `Micrometer is detected on the classpath, and a single MeterRegistry is present in the application context. The timers can be disabled by setting the ContainerProperty’s `micrometerEnabled to false.

始终保留两个计时器 - 一个用于对侦听器的成功调用,另一个用于失败。

Two timers are maintained - one for successful calls to the listener and one for failures.

这些计时器的名称为 spring.kafka.listener,并带有以下标记:

The timers are named spring.kafka.listener and have the following tags:

  • name : (container bean name)

  • result : success or failure

  • exception : none or ListenerExecutionFailedException

您可以使用 ContainerPropertiesmicrometerTags 属性添加其他标记。

You can add additional tags using the ContainerProperties’s `micrometerTags property.

从 2.9.8、3.0.6 版开始,您可以在 ContainerPropertiesmicrometerTagsProvider 中提供一个函数;该函数接收 ConsumerRecord<?, ?>,并返回可以基于该记录的标记,并与 micrometerTags 中的任何静态标记合并。

Starting with versions 2.9.8, 3.0.6, you can provide a function in ContainerProperties’s `micrometerTagsProvider; the function receives the ConsumerRecord<?, ?> and returns tags which can be based on that record, and merged with any static tags in micrometerTags.

对于并发容器,将为每个线程创建计时器,name 标记将附加 -n,其中 n 为 0concurrency-1

With the concurrent container, timers are created for each thread and the name tag is suffixed with -n where n is 0 to concurrency-1.

Monitoring KafkaTemplate Performance

从 2.5 版开始,如果在类路径上检测到 Micrometer,并且应用程序上下文中存在单个 MeterRegistry,则模板将为发送操作自动创建和更新 Micrometer Timer。可以通过将模板的 micrometerEnabled 属性设置为 false 来禁用这些计时器。

Starting with version 2.5, the template will automatically create and update Micrometer Timer`s for send operations, if `Micrometer is detected on the classpath, and a single MeterRegistry is present in the application context. The timers can be disabled by setting the template’s micrometerEnabled property to false.

始终保留两个计时器 - 一个用于对侦听器的成功调用,另一个用于失败。

Two timers are maintained - one for successful calls to the listener and one for failures.

这些计时器的名称为 spring.kafka.template,并带有以下标记:

The timers are named spring.kafka.template and have the following tags:

  • name : (template bean name)

  • result : success or failure

  • exception : none or the exception class name for failures

您可以使用模板的 micrometerTags 属性添加其他标记。

You can add additional tags using the template’s micrometerTags property.

从 2.9.8、3.0.6 版开始,您可以提供一个 KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>) 属性;该函数接收 ProducerRecord<?, ?>,并返回可以基于该记录的标记,并与 micrometerTags 中的任何静态标记合并。

Starting with versions 2.9.8, 3.0.6, you can provide a KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>) property; the function receives the ProducerRecord<?, ?> and returns tags which can be based on that record, and merged with any static tags in micrometerTags.

Micrometer Native Metrics

从版本 2.5 开始,框架提供 Factory Listeners 来管理 Micrometer KafkaClientMetrics 实例,无论何时创建和关闭生产者和使用者。

Starting with version 2.5, the framework provides Factory Listeners to manage a Micrometer KafkaClientMetrics instance whenever producers and consumers are created and closed.

要启用此功能,只需将监听器添加到您的生产者和消费者工厂:

To enable this feature, simply add the listeners to your producer and consumer factories:

@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}

传递给监听器的消费者/生产者 id 会以标签名 spring.id 添加到度量的标签中。

The consumer/producer id passed to the listener is added to the meter’s tags with tag name spring.id.

An example of obtaining one of the Kafka metrics
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count();

StreamsBuilderFactoryBean 提供了类似的侦听器 - 请参阅 KafkaStreams Micrometer Support

A similar listener is provided for the StreamsBuilderFactoryBean - see KafkaStreams Micrometer Support.

Micrometer Observation

从版本 3.0 开始,现在支持对 KafkaTemplate 和监听器容器使用 Micrometer 进行观察。

Using Micrometer for observation is now supported, since version 3.0, for the KafkaTemplate and listener containers.

KafkaTemplate 上将 observationEnabled 设置为 true,然后将 ContainerProperties 设置为 Micrometer Timers 以启用观察;这将禁用 Micrometer Timers,因为计时器现在将随每次观察而管理。

Set observationEnabled to true on the KafkaTemplate and ContainerProperties to enable observation; this will disable Micrometer Timers because the timers will now be managed with each observation.

Micrometer Observation 不支持批处理侦听器;这将启用 Micrometer 计时器

Micrometer Observation does not support batch listener; this will enable Micrometer Timers

有关更多信息,请参阅 Micrometer Tracing

Refer to Micrometer Tracing for more information.

要向计时器/跟踪添加标签,请分别将自定义 KafkaTemplateObservationConventionKafkaListenerObservationConvention 配置到模板或监听器容器中。

To add tags to timers/traces, configure a custom KafkaTemplateObservationConvention or KafkaListenerObservationConvention to the template or listener container, respectively.

默认实现为模板观察添加 bean.name 标签,为容器添加 listener.id 标签。

The default implementations add the bean.name tag for template observations and listener.id tag for containers.

您可以对 DefaultKafkaTemplateObservationConventionDefaultKafkaListenerObservationConvention 进行子类化,也可以提供全新的实现。

You can either subclass DefaultKafkaTemplateObservationConvention or DefaultKafkaListenerObservationConvention or provide completely new implementations.

有关记录的默认观察事项的详细信息,请参阅 Micrometer Observation Documentation

See Micrometer Observation Documentation for details of the default observations that are recorded.

从版本 3.0.6 开始,您可以根据消费者或生产者记录中的信息,向计时器和跟踪添加动态标签。为此,向监听器容器属性或 KafkaTemplate 添加自定义 KafkaListenerObservationConvention 和/或 KafkaTemplateObservationConvention。两个观察上下文中的 record 属性分别包含 ConsumerRecordProducerRecord

Starting with version 3.0.6, you can add dynamic tags to the timers and traces, based on information in the consumer or producer records. To do so, add a custom KafkaListenerObservationConvention and/or KafkaTemplateObservationConvention to the listener container properties or KafkaTemplate respectively. The record property in both observation contexts contains the ConsumerRecord or ProducerRecord respectively.

发送方和接收方上下文 remoteServiceName 属性已设置成 Kafka clusterId 属性;这是由 KafkaAdmin 检索的。如果由于某种原因(或许是缺乏管理员权限),您无法检索集群 id,从版本 3.1 开始,您可以在 KafkaAdmin 上设置一个手动 clusterId,并将其注入到 KafkaTemplate 和监听器容器。当它为 null(默认设置)时,管理员将调用 describeCluster 管理操作从代理中检索该设置。

The sender and receiver contexts remoteServiceName properties are set to the Kafka clusterId property; this is retrieved by a KafkaAdmin. If, for some reason - perhaps lack of admin permissions, you cannot retrieve the cluster id, starting with version 3.1, you can set a manual clusterId on the KafkaAdmin and inject it into KafkaTemplate s and listener containers. When it is null (default), the admin will invoke the describeCluster admin operation to retrieve it from the broker.