配置选项
本节包含 Kafka Streams 绑定器使用的配置选项。 有关绑定器的通用配置选项和属性,请参阅 核心文档。
Kafka Streams 绑定器属性
以下属性在绑定器级别可用,并且必须以 spring.cloud.stream.kafka.streams.binder.
为前缀。
Kafka Streams 绑定器中重复使用的任何 Kafka 绑定器提供的属性都必须以 spring.cloud.stream.kafka.streams.binder
为前缀,而不是 spring.cloud.stream.kafka.binder
。
此规则的唯一例外是定义 Kafka 引导服务器属性时,在这种情况下,任一前缀都有效。
- configuration
-
包含与 Apache Kafka Streams API 相关的属性的键/值对映射。 此属性必须以
spring.cloud.stream.kafka.streams.binder.
为前缀。 以下是使用此属性的一些示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关可能包含在流配置中的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig
JavaDocs。
所有可以通过 StreamsConfig
设置的配置都可以通过此设置。
使用此属性时,它适用于整个应用程序,因为这是一个绑定器级别的属性。
如果应用程序中有多个处理器,所有处理器都将获取这些属性。
对于 application.id
等属性,这将成为问题,因此您必须仔细检查 StreamsConfig
中的属性如何使用此绑定器级别的 configuration
属性进行映射。
- functions.<function-bean-name>.applicationId
-
仅适用于函数式处理器。 这可用于为应用程序中的每个函数设置应用程序 ID。 在多个函数的情况下,这是设置应用程序 ID 的便捷方法。
- functions.<function-bean-name>.configuration
-
仅适用于函数式处理器。 包含与 Apache Kafka Streams API 相关的属性的键/值对映射。 这与上面描述的绑定器级别的
configuration
属性类似,但此级别的configuration
属性仅限于命名函数。 当您有多个处理器并且希望根据特定函数限制对配置的访问时,您可能希望使用此功能。 所有StreamsConfig
属性都可以在此处使用。 - brokers
-
代理 URL
默认值:
localhost
- zkNodes
-
Zookeeper URL
默认值:
localhost
- deserializationExceptionHandler
-
反序列化错误处理程序类型。 此处理程序在绑定器级别应用,因此应用于应用程序中的所有输入绑定。 有一种方法可以在消费者绑定级别以更精细的方式控制它。 可能的值是 -
logAndContinue
、logAndFail
、skipAndContinue
或sendToDlq
默认值:
logAndFail
- applicationId
-
在绑定器级别全局设置 Kafka Streams 应用程序的 application.id 的便捷方法。 如果应用程序包含多个函数,则应以不同方式设置应用程序 ID。 有关设置应用程序 ID 的详细信息,请参阅上文。
默认值:应用程序将生成一个静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。
- stateStoreRetry.maxAttempts
-
尝试连接状态存储的最大尝试次数。
默认值:1
- stateStoreRetry.backoffPeriod
-
尝试在重试时连接状态存储的退避周期。
默认值:1000 毫秒
- consumerProperties
-
绑定器级别的任意消费者属性。
- producerProperties
-
绑定器级别的任意生产者属性。
- includeStoppedProcessorsForHealthCheck
-
当通过 actuator 停止处理器的绑定时,此处理器默认将不参与健康检查。 将此属性设置为
true
以启用所有处理器的健康检查,包括通过绑定 actuator 端点当前停止的处理器。默认值:false
Kafka Streams 生产者属性
以下属性_仅_适用于 Kafka Streams 生产者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
为前缀。
为方便起见,如果存在多个输出绑定并且它们都需要一个公共值,则可以使用前缀 spring.cloud.stream.kafka.streams.default.producer.
进行配置。
- keySerde
-
要使用的键 Serde
默认值:请参阅上面关于消息反/序列化的讨论
- valueSerde
-
要使用的值 Serde
默认值:请参阅上面关于消息反/序列化的讨论
- useNativeEncoding
-
启用/禁用原生编码的标志
默认值:
true
。 - streamPartitionerBeanName
-
在消费者端使用的自定义出站分区器 bean 名称。 应用程序可以提供自定义的
StreamPartitioner
作为 Spring bean,并且可以将此 bean 的名称提供给生产者以代替默认的。默认值:请参阅上面关于出站分区支持的讨论。
- producedAs
-
处理器正在生产到的 Sink 组件的自定义名称。
默认值:
none
(由 Kafka Streams 生成)
Kafka Streams 消费者属性
以下属性适用于 Kafka Streams 消费者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.
为前缀。
为方便起见,如果存在多个输入绑定并且它们都需要一个公共值,则可以使用前缀 spring.cloud.stream.kafka.streams.default.consumer.
进行配置。
- applicationId
-
为每个输入绑定设置 application.id。
默认值:请参阅上文。
- keySerde
-
要使用的键 Serde
默认值:请参阅上面关于消息反/序列化的讨论
- valueSerde
-
要使用的值 Serde
默认值:请参阅上面关于消息反/序列化的讨论
- materializedAs
-
使用传入 KTable 类型时要具体化的状态存储
默认值:
none
。 - useNativeDecoding
-
启用/禁用原生解码的标志
默认值:
true
。 - dlqName
-
DLQ 主题名称。
默认值:请参阅上面关于错误处理和 DLQ 的讨论。
- startOffset
-
如果没有已提交的偏移量可供消费,则从该偏移量开始消费。 这主要在消费者首次从主题消费时使用。 Kafka Streams 默认使用
earliest
策略,绑定器也使用相同的默认值。 可以使用此属性将其覆盖为latest
。默认值:
earliest
。
Note: 在消费者上使用 resetOffsets
对 Kafka Streams 绑定器没有影响。
与基于消息通道的绑定器不同,Kafka Streams 绑定器不会按需查找开头或结尾。
- deserializationExceptionHandler
-
反序列化错误处理程序类型。 此处理程序应用于每个消费者绑定,而不是之前描述的绑定器级别属性。 可能的值是 -
logAndContinue
、logAndFail
、skipAndContinue
或sendToDlq
默认值:
logAndFail
- timestampExtractorBeanName
-
在消费者端使用的特定时间戳提取器 bean 名称。 应用程序可以提供
TimestampExtractor
作为 Spring bean,并且可以将此 bean 的名称提供给消费者以代替默认的。默认值:请参阅上面关于时间戳提取器的讨论。
- eventTypes
-
此绑定支持的事件类型列表,以逗号分隔。
默认值:
none
- eventTypeHeaderKey
-
通过此绑定传入的每个记录上的事件类型头键。
默认值:
event_type
- consumedAs
-
处理器正在消费的源组件的自定义名称。
默认值:
none
(由 Kafka Streams 生成)
关于并发的特别说明
在 Kafka Streams 中,您可以使用 num.stream.threads
属性控制处理器可以创建的线程数。
这可以通过上面在绑定器、函数、生产者或消费者级别描述的各种 configuration
选项来完成。
您还可以为此目的使用核心 Spring Cloud Stream 提供的 concurrency
属性。
使用此属性时,您需要在消费者端使用它。
当您有多个输入绑定时,请将其设置在第一个输入绑定上。
例如,当设置 spring.cloud.stream.bindings.process-in-0.consumer.concurrency
时,它将被绑定器翻译为 num.stream.threads
。
如果您有多个处理器,并且一个处理器定义了绑定级别并发,而其他处理器没有,那么那些没有绑定级别并发的处理器将默认回通过
spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads
指定的绑定器范围属性。
如果此绑定器配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。