配置选项

本节包含 Kafka Streams 绑定器使用的配置选项。 有关绑定器的通用配置选项和属性,请参阅 核心文档

Kafka Streams 绑定器属性

以下属性在绑定器级别可用,并且必须以 spring.cloud.stream.kafka.streams.binder. 为前缀。 Kafka Streams 绑定器中重复使用的任何 Kafka 绑定器提供的属性都必须以 spring.cloud.stream.kafka.streams.binder 为前缀,而不是 spring.cloud.stream.kafka.binder。 此规则的唯一例外是定义 Kafka 引导服务器属性时,在这种情况下,任一前缀都有效。

configuration

包含与 Apache Kafka Streams API 相关的属性的键/值对映射。 此属性必须以 spring.cloud.stream.kafka.streams.binder. 为前缀。 以下是使用此属性的一些示例。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有关可能包含在流配置中的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig JavaDocs。 所有可以通过 StreamsConfig 设置的配置都可以通过此设置。 使用此属性时,它适用于整个应用程序,因为这是一个绑定器级别的属性。 如果应用程序中有多个处理器,所有处理器都将获取这些属性。 对于 application.id 等属性,这将成为问题,因此您必须仔细检查 StreamsConfig 中的属性如何使用此绑定器级别的 configuration 属性进行映射。

functions.<function-bean-name>.applicationId

仅适用于函数式处理器。 这可用于为应用程序中的每个函数设置应用程序 ID。 在多个函数的情况下,这是设置应用程序 ID 的便捷方法。

functions.<function-bean-name>.configuration

仅适用于函数式处理器。 包含与 Apache Kafka Streams API 相关的属性的键/值对映射。 这与上面描述的绑定器级别的 configuration 属性类似,但此级别的 configuration 属性仅限于命名函数。 当您有多个处理器并且希望根据特定函数限制对配置的访问时,您可能希望使用此功能。 所有 StreamsConfig 属性都可以在此处使用。

brokers

代理 URL

默认值:localhost

zkNodes

Zookeeper URL

默认值:localhost

deserializationExceptionHandler

反序列化错误处理程序类型。 此处理程序在绑定器级别应用,因此应用于应用程序中的所有输入绑定。 有一种方法可以在消费者绑定级别以更精细的方式控制它。 可能的值是 - logAndContinuelogAndFailskipAndContinuesendToDlq

默认值:logAndFail

applicationId

在绑定器级别全局设置 Kafka Streams 应用程序的 application.id 的便捷方法。 如果应用程序包含多个函数,则应以不同方式设置应用程序 ID。 有关设置应用程序 ID 的详细信息,请参阅上文。

默认值:应用程序将生成一个静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。

stateStoreRetry.maxAttempts

尝试连接状态存储的最大尝试次数。

默认值:1

stateStoreRetry.backoffPeriod

尝试在重试时连接状态存储的退避周期。

默认值:1000 毫秒

consumerProperties

绑定器级别的任意消费者属性。

producerProperties

绑定器级别的任意生产者属性。

includeStoppedProcessorsForHealthCheck

当通过 actuator 停止处理器的绑定时,此处理器默认将不参与健康检查。 将此属性设置为 true 以启用所有处理器的健康检查,包括通过绑定 actuator 端点当前停止的处理器。

默认值:false

Kafka Streams 生产者属性

以下属性_仅_适用于 Kafka Streams 生产者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. 为前缀。 为方便起见,如果存在多个输出绑定并且它们都需要一个公共值,则可以使用前缀 spring.cloud.stream.kafka.streams.default.producer. 进行配置。

keySerde

要使用的键 Serde

默认值:请参阅上面关于消息反/序列化的讨论

valueSerde

要使用的值 Serde

默认值:请参阅上面关于消息反/序列化的讨论

useNativeEncoding

启用/禁用原生编码的标志

默认值:true

streamPartitionerBeanName

在消费者端使用的自定义出站分区器 bean 名称。 应用程序可以提供自定义的 StreamPartitioner 作为 Spring bean,并且可以将此 bean 的名称提供给生产者以代替默认的。

默认值:请参阅上面关于出站分区支持的讨论。

producedAs

处理器正在生产到的 Sink 组件的自定义名称。

默认值:none(由 Kafka Streams 生成)

Kafka Streams 消费者属性

以下属性适用于 Kafka Streams 消费者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. 为前缀。 为方便起见,如果存在多个输入绑定并且它们都需要一个公共值,则可以使用前缀 spring.cloud.stream.kafka.streams.default.consumer. 进行配置。

applicationId

为每个输入绑定设置 application.id。

默认值:请参阅上文。

keySerde

要使用的键 Serde

默认值:请参阅上面关于消息反/序列化的讨论

valueSerde

要使用的值 Serde

默认值:请参阅上面关于消息反/序列化的讨论

materializedAs

使用传入 KTable 类型时要具体化的状态存储

默认值:none

useNativeDecoding

启用/禁用原生解码的标志

默认值:true

dlqName

DLQ 主题名称。

默认值:请参阅上面关于错误处理和 DLQ 的讨论。

startOffset

如果没有已提交的偏移量可供消费,则从该偏移量开始消费。 这主要在消费者首次从主题消费时使用。 Kafka Streams 默认使用 earliest 策略,绑定器也使用相同的默认值。 可以使用此属性将其覆盖为 latest

默认值:earliest

Note: 在消费者上使用 resetOffsets 对 Kafka Streams 绑定器没有影响。 与基于消息通道的绑定器不同,Kafka Streams 绑定器不会按需查找开头或结尾。

deserializationExceptionHandler

反序列化错误处理程序类型。 此处理程序应用于每个消费者绑定,而不是之前描述的绑定器级别属性。 可能的值是 - logAndContinuelogAndFailskipAndContinuesendToDlq

默认值:logAndFail

timestampExtractorBeanName

在消费者端使用的特定时间戳提取器 bean 名称。 应用程序可以提供 TimestampExtractor 作为 Spring bean,并且可以将此 bean 的名称提供给消费者以代替默认的。

默认值:请参阅上面关于时间戳提取器的讨论。

eventTypes

此绑定支持的事件类型列表,以逗号分隔。

默认值:none

eventTypeHeaderKey

通过此绑定传入的每个记录上的事件类型头键。

默认值:event_type

consumedAs

处理器正在消费的源组件的自定义名称。

默认值:none(由 Kafka Streams 生成)

关于并发的特别说明

在 Kafka Streams 中,您可以使用 num.stream.threads 属性控制处理器可以创建的线程数。 这可以通过上面在绑定器、函数、生产者或消费者级别描述的各种 configuration 选项来完成。 您还可以为此目的使用核心 Spring Cloud Stream 提供的 concurrency 属性。 使用此属性时,您需要在消费者端使用它。 当您有多个输入绑定时,请将其设置在第一个输入绑定上。 例如,当设置 spring.cloud.stream.bindings.process-in-0.consumer.concurrency 时,它将被绑定器翻译为 num.stream.threads。 如果您有多个处理器,并且一个处理器定义了绑定级别并发,而其他处理器没有,那么那些没有绑定级别并发的处理器将默认回通过 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads 指定的绑定器范围属性。 如果此绑定器配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。