配置选项
本节包含 Apache Kafka 绑定器使用的配置选项。 有关绑定器的常见配置选项和属性,请参阅核心文档中的 绑定属性。
Kafka 绑定器属性
- spring.cloud.stream.kafka.binder.brokers
-
Kafka 绑定器连接的代理列表。
默认值:
localhost
。 - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers
允许指定带或不带端口信息的主机(例如,host1,host2:port2
)。 当代理列表中未配置端口时,此属性设置默认端口。默认值:
9092
。 - spring.cloud.stream.kafka.binder.configuration
-
客户端属性(包括生产者和消费者)的键/值映射,传递给绑定器创建的所有客户端。 由于这些属性同时被生产者和消费者使用,因此使用应限于通用属性——例如,安全设置。 通过此配置提供的未知 Kafka 生产者或消费者属性将被过滤掉,不允许传播。 此处的属性会覆盖在 boot 中设置的任何属性。
默认值:空映射。
- spring.cloud.stream.kafka.binder.consumerProperties
-
任意 Kafka 客户端消费者属性的键/值映射。 除了支持已知的 Kafka 消费者属性外,此处也允许未知的消费者属性。 此处的属性会覆盖在 boot 中以及上述
configuration
属性中设置的任何属性。默认值:空映射。
- spring.cloud.stream.kafka.binder.headers
-
绑定器传输的自定义头列表。 仅在与
kafka-clients
版本 < 0.11.0.0 的旧应用程序 (⇐ 1.3.x) 通信时需要。新版本原生支持头。默认值:空。
- spring.cloud.stream.kafka.binder.healthTimeout
-
等待获取分区信息的时间,以秒为单位。 如果此计时器过期,则健康状况报告为 down。
默认值:60。
- spring.cloud.stream.kafka.binder.requiredAcks
-
代理上所需的 ack 数量。 有关生产者
acks
属性,请参阅 Kafka 文档。默认值:
1
。 - spring.cloud.stream.kafka.binder.minPartitionCount
-
仅当
autoCreateTopics
或autoAddPartitions
设置为 true 时有效。 绑定器在其生产或消费数据的 topics 上配置的全局最小分区数。 它可以被生产者的partitionCount
设置或生产者的instanceCount * concurrency
设置的值(如果任何一个更大)所取代。默认值:
1
。 - spring.cloud.stream.kafka.binder.producerProperties
-
任意 Kafka 客户端生产者属性的键/值映射。 除了支持已知的 Kafka 生产者属性外,此处也允许未知的生产者属性。 此处的属性会覆盖在 boot 中以及上述
configuration
属性中设置的任何属性。默认值:空映射。
- spring.cloud.stream.kafka.binder.replicationFactor
-
如果
autoCreateTopics
处于活动状态,则自动创建 topic 的复制因子。 可以在每个绑定上覆盖。如果您使用的是早于 2.4 版本的 Kafka 代理,则此值应至少设置为
1
。 从 3.0.8 版本开始,绑定器使用-1
作为默认值,这表示将使用代理的 'default.replication.factor' 属性来确定副本数量。 请咨询您的 Kafka 代理管理员,了解是否存在要求最小复制因子的策略,如果是这样,通常default.replication.factor
将匹配该值,并且应使用-1
,除非您需要大于最小值的复制因子。默认值:
-1
。 - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果设置为
true
,绑定器会自动创建新的 topic。 如果设置为false
,绑定器依赖于已配置的 topic。 在后一种情况下,如果 topic 不存在,绑定器将无法启动。此设置独立于代理的
auto.create.topics.enable
设置,并且不影响它。 如果服务器设置为自动创建 topic,它们可能会作为元数据检索请求的一部分创建,并使用默认代理设置。默认值:
true
。 - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果设置为
true
,绑定器会根据需要创建新的分区。 如果设置为false
,绑定器依赖于已配置的 topic 分区大小。 如果目标 topic 的分区计数小于预期值,绑定器将无法启动。默认值:
false
。 - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
在绑定器中启用事务。请参阅 Kafka 文档中的
transaction.id
和spring-kafka
文档中的 事务。 启用事务后,单个producer
属性将被忽略,所有生产者都使用spring.cloud.stream.kafka.binder.transaction.producer.*
属性。默认值:
null
(无事务) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事务绑定器中生产者的全局生产者属性。 请参阅
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
和 Kafka 生产者属性 以及所有绑定器支持的通用生产者属性。默认值:请参阅单个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
KafkaHeaderMapper
的 bean 名称,用于将spring-messaging
头映射到 Kafka 头以及从 Kafka 头映射回来。 例如,如果您希望自定义使用 JSON 反序列化头的BinderHeaderMapper
bean 中的受信任包,请使用此项。 如果此自定义BinderHeaderMapper
bean 未通过此属性提供给绑定器,则绑定器将查找名为kafkaBinderHeaderMapper
且类型为BinderHeaderMapper
的头映射器 bean,然后才回退到绑定器创建的默认BinderHeaderMapper
。默认值:无。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
标志,用于在发现 topic 上的任何分区(无论哪个消费者正在从中接收数据)没有 leader 时,将绑定器健康状况设置为
down
。默认值:
true
。 - spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
当信任库或密钥库证书位置作为非本地文件系统资源(org.springframework.core.io.Resource 支持的资源,例如 CLASSPATH、HTTP 等)提供时, 绑定器将资源从路径(可转换为 org.springframework.core.io.Resource)复制到文件系统上的位置。 这对于代理级别证书(
ssl.truststore.location
和ssl.keystore.location
)和用于模式注册表的证书(schema.registry.ssl.truststore.location
和schema.registry.ssl.keystore.location
)都是如此。 请记住,信任库和密钥库位置路径必须在spring.cloud.stream.kafka.binder.configuration…
下提供。 例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location
、spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location
等。 文件将复制到此属性值指定的位置,该位置必须是文件系统上存在的、可由运行应用程序的进程写入的目录。 如果未设置此值且证书文件是非本地文件系统资源,则它将复制到System.getProperty("java.io.tmpdir")
返回的系统临时目录。 如果此值存在但找不到目录或不可写入,则也是如此。默认值:无。
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled
-
设置为 true 时,每次访问度量时都会计算每个消费者 topic 的偏移量滞后度量。 设置为 false 时,仅使用定期计算的偏移量滞后。
默认值:true
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
-
计算每个消费者 topic 偏移量滞后的间隔。 当
metrics.defaultOffsetLagMetricsEnabled
被禁用或其计算时间过长时,将使用此值。默认值:60 秒
- spring.cloud.stream.kafka.binder.enableObservation
-
在此绑定器中的所有绑定上启用 Micrometer 观察注册表。
默认值:false
- spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup
-
KafkaHealthIndicator
元数据消费者group.id
。 此消费者由HealthIndicator
用于查询有关正在使用的 topic 的元数据。默认值:无。
Kafka 消费者属性
以下属性仅适用于 Kafka 消费者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.consumer.
为前缀。
为避免重复,Spring Cloud Stream 支持以 |
- admin.configuration
-
从 2.1.1 版本开始,此属性已弃用,取而代之的是
topic.properties
,并且在未来版本中将删除对其的支持。 - admin.replicas-assignment
-
从 2.1.1 版本开始,此属性已弃用,取而代之的是
topic.replicas-assignment
,并且在未来版本中将删除对其的支持。 - admin.replication-factor
-
从 2.1.1 版本开始,此属性已弃用,取而代之的是
topic.replication-factor
,并且在未来版本中将删除对其的支持。 - autoRebalanceEnabled
-
当
true
时,topic 分区会在消费者组的成员之间自动重新平衡。 当false
时,每个消费者根据spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
被分配一组固定的分区。 这要求在每个启动的实例上都适当地设置spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
属性。 在这种情况下,spring.cloud.stream.instanceCount
属性的值通常必须大于 1。默认值:
true
。 - ackEachRecord
-
当
autoCommitOffset
为true
时,此设置决定是否在处理每个记录后提交偏移量。 默认情况下,在处理完consumer.poll()
返回的批处理中的所有记录后,会提交偏移量。 通过消费者configuration
属性设置的max.poll.records
Kafka 属性可以控制 poll 返回的记录数。 将其设置为true
可能会导致性能下降,但这样做可以减少发生故障时重新传递记录的可能性。 另请参阅绑定器的requiredAcks
属性,该属性也影响提交偏移量的性能。 此属性自 3.1 起已弃用,取而代之的是使用ackMode
。 如果未设置ackMode
且未启用批处理模式,将使用RECORD
ackMode。默认值:
false
。 - autoCommitOffset
-
从 3.1 版本开始,此属性已弃用。 有关替代方案的更多详细信息,请参阅
ackMode
。 消息处理后是否自动提交偏移量。 如果设置为false
,则入站消息中存在一个键为kafka_acknowledgment
、类型为org.springframework.kafka.support.Acknowledgment
的头。 应用程序可以使用此头来确认消息。 有关详细信息,请参阅示例部分。 当此属性设置为false
时,Kafka 绑定器将 ack 模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
,并且应用程序负责确认记录。 另请参阅ackEachRecord
。默认值:
true
。 - ackMode
-
指定容器确认模式。 这基于 Spring Kafka 中定义的 AckMode 枚举。 如果
ackEachRecord
属性设置为true
且消费者未处于批处理模式,则将使用RECORD
确认模式,否则,使用此属性提供的确认模式。 - autoCommitOnError
-
在可轮询消费者中,如果设置为
true
,则在出错时始终自动提交。 如果未设置(默认)或为 false,则在可轮询消费者中不会自动提交。 请注意,此属性仅适用于可轮询消费者。默认值:未设置。
- resetOffsets
-
是否将消费者上的偏移量重置为
startOffset
提供的值。 如果提供了KafkaBindingRebalanceListener
,则必须为 false;请参阅 rebalance listener 有关此属性的更多信息,请参阅 reset-offsets。默认值:
false
。 - startOffset
-
新组的起始偏移量。 允许值:
earliest
和latest
。 如果为消费者“绑定”显式设置了消费者组(通过spring.cloud.stream.bindings.<channelName>.group
),则“startOffset”设置为earliest
。否则,对于anonymous
消费者组,它设置为latest
。 有关此属性的更多信息,请参阅 reset-offsets。默认值:null(相当于
earliest
)。 - enableDlq
-
设置为 true 时,为消费者启用 DLQ 行为。 默认情况下,导致错误的消息会转发到名为
error.<destination>.<group>
的 topic。 DLQ topic 名称可以通过设置dlqName
属性或定义DlqDestinationResolver
类型的@Bean
来配置。 这为更常见的 Kafka 重放场景提供了一个替代选项,适用于错误数量相对较小且重放整个原始 topic 可能过于繁琐的情况。 有关更多信息,请参阅 kafka dlq processing。 从 2.0 版本开始,发送到 DLQ topic 的消息会添加以下头:x-original-topic
、x-exception-message
和x-exception-stacktrace
作为byte[]
。 默认情况下,失败的记录会发送到 DLQ topic 中与原始记录相同的分区号。 有关如何更改此行为,请参阅 dlq partition selection。 当destinationIsPattern
为true
时不允许使用。默认值:
false
。 - dlqPartitions
-
当
enableDlq
为 true 且未设置此属性时,会创建一个与主 topic 具有相同分区数的死信 topic。 通常,死信记录会发送到死信 topic 中与原始记录相同的分区。 此行为可以更改;请参阅 dlq partition selection。 如果此属性设置为1
且没有DqlPartitionFunction
bean,则所有死信记录都将写入分区0
。 如果此属性大于1
,则 必须 提供DlqPartitionFunction
bean。 请注意,实际分区计数受绑定器的minPartitionCount
属性影响。默认值:
none
- configuration
-
包含通用 Kafka 消费者属性的键/值对映射。 除了 Kafka 消费者属性外,此处还可以传递其他配置属性。 例如,应用程序所需的一些属性,如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
。bootstrap.servers
属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。默认值:空映射。
- dlqName
-
接收错误消息的 DLQ topic 的名称。
默认值:null(如果未指定,导致错误的消息将转发到名为
error.<destination>.<group>
的 topic)。 - dlqProducerProperties
-
使用此属性可以设置 DLQ 特定的生产者属性。 所有可通过 kafka 生产者属性获得的属性都可以通过此属性设置。 当消费者上启用原生解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。 这必须以
dlqProducerProperties.configuration.key.serializer
和dlqProducerProperties.configuration.value.serializer
的形式提供。默认值:默认 Kafka 生产者属性。
- standardHeaders
-
指示入站通道适配器填充哪些标准头。 允许值:
none
、id
、timestamp
或both
。 如果使用原生反序列化并且接收消息的第一个组件需要id
(例如配置为使用 JDBC 消息存储的聚合器),则很有用。默认值:
none
- converterBeanName
-
实现
RecordMessageConverter
的 bean 名称。用于入站通道适配器以替换默认的MessagingMessageConverter
。默认值:
null
- idleEventInterval
-
事件之间的时间间隔(以毫秒为单位),指示最近没有收到消息。 使用
ApplicationListener<ListenerContainerIdleEvent>
来接收这些事件。 有关使用示例,请参阅 pause-resume。默认值:
30000
- destinationIsPattern
-
当为 true 时,目标被视为用于由代理匹配 topic 名称的正则表达式
Pattern
。 当为 true 时,不提供 topic,并且不允许enableDlq
,因为绑定器在 provision 阶段不知道 topic 名称。 请注意,检测匹配模式的新 topic 所需的时间由消费者属性metadata.max.age.ms
控制,该属性(在撰写本文时)默认为 300,000 毫秒(5 分钟)。 这可以使用上面的configuration
属性进行配置。默认值:
false
- topic.properties
-
Kafka topic 属性的
Map
,用于 provision 新 topic——例如,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
默认值:无。
- topic.replicas-assignment
-
副本分配的 Map<Integer, List<Integer>>,其中键是分区,值是分配。 用于 provision 新 topic。 请参阅
kafka-clients
jar 中的NewTopic
Javadoc。默认值:无。
- topic.replication-factor
-
provision topic 时使用的复制因子。覆盖绑定器范围的设置。 如果存在
replicas-assignments
,则忽略。默认值:无(使用绑定器范围的默认值 -1)。
- pollTimeout
-
可轮询消费者中用于轮询的超时时间。
默认值:5 秒。
- transactionManager
-
KafkaAwareTransactionManager
的 bean 名称,用于覆盖此绑定的绑定器事务管理器。 通常需要它,如果您想使用ChainedKafkaTransactionManaager
将另一个事务与 Kafka 事务同步。 为了实现记录的精确一次消费和生产,消费者和生产者绑定都必须配置相同的事务管理器。默认值:无。
- txCommitRecovered
-
当使用事务绑定器时,恢复记录的偏移量(例如,当重试耗尽且记录发送到死信 topic 时)将通过新事务提交,默认情况下。 将此属性设置为
false
可抑制提交恢复记录的偏移量。默认值:true。
- commonErrorHandlerBeanName
-
每个消费者绑定使用的
CommonErrorHandler
bean 名称。 当存在时,此用户提供的CommonErrorHandler
优先于绑定器定义的任何其他错误处理程序。 这是一种方便表达错误处理程序的方式,如果应用程序不想使用ListenerContainerCustomizer
然后检查目标/组组合来设置错误处理程序。默认值:无。
Kafka 生产者属性
以下属性仅适用于 Kafka 生产者,并且
必须以 spring.cloud.stream.kafka.bindings.<channelName>.producer.
为前缀。
为避免重复,Spring Cloud Stream 支持以 |
- admin.configuration
-
从 2.1.1 版本开始,此属性已弃用,取而代之的是
topic.properties
,并且在未来版本中将删除对其的支持。 - admin.replicas-assignment
-
从 2.1.1 版本开始,此属性已弃用,取而代之的是
topic.replicas-assignment
,并且在未来版本中将删除对其的支持。 - admin.replication-factor
-
从 2.1.1 版本开始,此属性已弃用,取而代之的是
topic.replication-factor
,并且在未来版本中将删除对其的支持。 - bufferSize
-
Kafka 生产者在发送前尝试批量处理的数据上限(以字节为单位)。
默认值:
16384
。 - sync
-
生产者是否同步。
默认值:
false
。 - sendTimeoutExpression
-
一个 SpEL 表达式,针对出站消息进行评估,用于评估启用同步发布时等待 ack 的时间——例如,
headers['mySendTimeout']
。 超时值以毫秒为单位。 在 3.0 之前的版本中,除非使用原生编码,否则不能使用 payload,因为在评估此表达式时,payload 已经采用byte[]
的形式。 现在,在转换 payload 之前评估表达式。默认值:
none
。 - batchTimeout
-
生产者等待允许更多消息在同一批次中累积然后发送消息的时间。 (通常,生产者根本不会等待,而只是发送在上次发送进行时累积的所有消息。)非零值可能会以延迟为代价提高吞吐量。
默认值:
0
。 - messageKeyExpression
-
一个 SpEL 表达式,针对出站消息进行评估,用于填充生成的 Kafka 消息的键——例如,
headers['myKey']
。 在 3.0 之前的版本中,除非使用原生编码,否则不能使用 payload,因为在评估此表达式时,payload 已经采用byte[]
的形式。 现在,在转换 payload 之前评估表达式。 对于常规处理器(Function<String, String>
或Function<Message<?>, Message<?>>
),如果生成的键需要与来自 topic 的入站键相同,则此属性可以按如下设置。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']
对于响应式函数,有一个重要的注意事项。 在这种情况下,应用程序需要手动将头从入站消息复制到出站消息。 您可以设置头,例如myKey
并使用headers['myKey']
,如上所述,或者,为了方便起见,只需设置KafkaHeaders.MESSAGE_KEY
头,您根本不需要设置此属性。默认值:
none
。 - headerPatterns
-
一个逗号分隔的简单模式列表,用于匹配要映射到
ProducerRecord
中的 KafkaHeaders
的 Spring 消息头。 模式可以以通配符(星号)开头或结尾。 模式可以通过前缀!
来否定。 匹配在第一次匹配(肯定或否定)后停止。 例如,!ask,as*
将通过ash
而不是ask
。id
和timestamp
从不映射。默认值:
*
(所有头——除了id
和timestamp
) - configuration
-
包含通用 Kafka 生产者属性的键/值对映射。
bootstrap.servers
属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。默认值:空映射。
- topic.properties
-
Kafka topic 属性的
Map
,用于 provision 新 topic——例如,spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
- topic.replicas-assignment
-
副本分配的 Map<Integer, List<Integer>>,其中键是分区,值是分配。 用于 provision 新 topic。 请参阅
kafka-clients
jar 中的NewTopic
Javadoc。默认值:无。
- topic.replication-factor
-
provision topic 时使用的复制因子。覆盖绑定器范围的设置。 如果存在
replicas-assignments
,则忽略。默认值:无(使用绑定器范围的默认值 -1)。
- useTopicHeader
-
设置为
true
以使用出站消息中KafkaHeaders.TOPIC
消息头的值覆盖默认的绑定目标(topic 名称)。 如果头不存在,则使用默认的绑定目标。默认值:
false
。 - recordMetadataChannel
-
MessageChannel
的 bean 名称,用于发送成功的发送结果;该 bean 必须存在于应用程序上下文中。 发送到通道的消息是已发送的消息(如果进行了转换),并带有一个额外的头KafkaHeaders.RECORD_METADATA
。 该头包含 Kafka 客户端提供的RecordMetadata
对象;它包括记录在 topic 中写入的分区和偏移量。ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
失败的发送将发送到生产者错误通道(如果已配置);请参阅 Kafka 错误通道。 默认值:null。
Kafka 绑定器使用生产者的 |
- compression
-
设置
compression.type
生产者属性。 支持的值为none
、gzip
、snappy
、lz4
和zstd
。 如果您将kafka-clients
jar 覆盖为 2.1.0(或更高版本),如 Spring for Apache Kafka documentation 中所述,并且希望使用zstd
压缩,请使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
。默认值:
none
。 - transactionManager
-
KafkaAwareTransactionManager
的 bean 名称,用于覆盖此绑定的绑定器事务管理器。 通常需要它,如果您想使用ChainedKafkaTransactionManaager
将另一个事务与 Kafka 事务同步。 为了实现记录的精确一次消费和生产,消费者和生产者绑定都必须配置相同的事务管理器。默认值:无。
- closeTimeout
-
关闭生产者时等待的超时时间(以秒为单位)。
默认值:
30
- allowNonTransactional
-
通常,与事务绑定器关联的所有输出绑定都将在新事务中发布,如果尚未进行事务。 此属性允许您覆盖该行为。 如果设置为 true,则发布到此输出绑定的记录将不会在事务中运行,除非事务已在进行中。
默认值:
false