配置选项

本节包含 Apache Kafka 绑定器使用的配置选项。 有关绑定器的常见配置选项和属性,请参阅核心文档中的 绑定属性

Kafka 绑定器属性

spring.cloud.stream.kafka.binder.brokers

Kafka 绑定器连接的代理列表。

默认值:localhost

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers 允许指定带或不带端口信息的主机(例如,host1,host2:port2)。 当代理列表中未配置端口时,此属性设置默认端口。

默认值:9092

spring.cloud.stream.kafka.binder.configuration

客户端属性(包括生产者和消费者)的键/值映射,传递给绑定器创建的所有客户端。 由于这些属性同时被生产者和消费者使用,因此使用应限于通用属性——例如,安全设置。 通过此配置提供的未知 Kafka 生产者或消费者属性将被过滤掉,不允许传播。 此处的属性会覆盖在 boot 中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.consumerProperties

任意 Kafka 客户端消费者属性的键/值映射。 除了支持已知的 Kafka 消费者属性外,此处也允许未知的消费者属性。 此处的属性会覆盖在 boot 中以及上述 configuration 属性中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.headers

绑定器传输的自定义头列表。 仅在与 kafka-clients 版本 < 0.11.0.0 的旧应用程序 (⇐ 1.3.x) 通信时需要。新版本原生支持头。

默认值:空。

spring.cloud.stream.kafka.binder.healthTimeout

等待获取分区信息的时间,以秒为单位。 如果此计时器过期,则健康状况报告为 down。

默认值:60。

spring.cloud.stream.kafka.binder.requiredAcks

代理上所需的 ack 数量。 有关生产者 acks 属性,请参阅 Kafka 文档。

默认值:1

spring.cloud.stream.kafka.binder.minPartitionCount

仅当 autoCreateTopicsautoAddPartitions 设置为 true 时有效。 绑定器在其生产或消费数据的 topics 上配置的全局最小分区数。 它可以被生产者的 partitionCount 设置或生产者的 instanceCount * concurrency 设置的值(如果任何一个更大)所取代。

默认值:1

spring.cloud.stream.kafka.binder.producerProperties

任意 Kafka 客户端生产者属性的键/值映射。 除了支持已知的 Kafka 生产者属性外,此处也允许未知的生产者属性。 此处的属性会覆盖在 boot 中以及上述 configuration 属性中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.replicationFactor

如果 autoCreateTopics 处于活动状态,则自动创建 topic 的复制因子。 可以在每个绑定上覆盖。

如果您使用的是早于 2.4 版本的 Kafka 代理,则此值应至少设置为 1。 从 3.0.8 版本开始,绑定器使用 -1 作为默认值,这表示将使用代理的 'default.replication.factor' 属性来确定副本数量。 请咨询您的 Kafka 代理管理员,了解是否存在要求最小复制因子的策略,如果是这样,通常 default.replication.factor 将匹配该值,并且应使用 -1,除非您需要大于最小值的复制因子。

默认值:-1

spring.cloud.stream.kafka.binder.autoCreateTopics

如果设置为 true,绑定器会自动创建新的 topic。 如果设置为 false,绑定器依赖于已配置的 topic。 在后一种情况下,如果 topic 不存在,绑定器将无法启动。

此设置独立于代理的 auto.create.topics.enable 设置,并且不影响它。 如果服务器设置为自动创建 topic,它们可能会作为元数据检索请求的一部分创建,并使用默认代理设置。

默认值:true

spring.cloud.stream.kafka.binder.autoAddPartitions

如果设置为 true,绑定器会根据需要创建新的分区。 如果设置为 false,绑定器依赖于已配置的 topic 分区大小。 如果目标 topic 的分区计数小于预期值,绑定器将无法启动。

默认值:false

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

在绑定器中启用事务。请参阅 Kafka 文档中的 transaction.idspring-kafka 文档中的 事务。 启用事务后,单个 producer 属性将被忽略,所有生产者都使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性。

默认值:null(无事务)

spring.cloud.stream.kafka.binder.transaction.producer.*

事务绑定器中生产者的全局生产者属性。 请参阅 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixKafka 生产者属性 以及所有绑定器支持的通用生产者属性。

默认值:请参阅单个生产者属性。

spring.cloud.stream.kafka.binder.headerMapperBeanName

KafkaHeaderMapper 的 bean 名称,用于将 spring-messaging 头映射到 Kafka 头以及从 Kafka 头映射回来。 例如,如果您希望自定义使用 JSON 反序列化头的 BinderHeaderMapper bean 中的受信任包,请使用此项。 如果此自定义 BinderHeaderMapper bean 未通过此属性提供给绑定器,则绑定器将查找名为 kafkaBinderHeaderMapper 且类型为 BinderHeaderMapper 的头映射器 bean,然后才回退到绑定器创建的默认 BinderHeaderMapper

默认值:无。

spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader

标志,用于在发现 topic 上的任何分区(无论哪个消费者正在从中接收数据)没有 leader 时,将绑定器健康状况设置为 down

默认值:true

spring.cloud.stream.kafka.binder.certificateStoreDirectory

当信任库或密钥库证书位置作为非本地文件系统资源(org.springframework.core.io.Resource 支持的资源,例如 CLASSPATH、HTTP 等)提供时, 绑定器将资源从路径(可转换为 org.springframework.core.io.Resource)复制到文件系统上的位置。 这对于代理级别证书(ssl.truststore.locationssl.keystore.location)和用于模式注册表的证书(schema.registry.ssl.truststore.locationschema.registry.ssl.keystore.location)都是如此。 请记住,信任库和密钥库位置路径必须在 spring.cloud.stream.kafka.binder.configuration… 下提供。 例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.locationspring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location 等。 文件将复制到此属性值指定的位置,该位置必须是文件系统上存在的、可由运行应用程序的进程写入的目录。 如果未设置此值且证书文件是非本地文件系统资源,则它将复制到 System.getProperty("java.io.tmpdir") 返回的系统临时目录。 如果此值存在但找不到目录或不可写入,则也是如此。

默认值:无。

spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled

设置为 true 时,每次访问度量时都会计算每个消费者 topic 的偏移量滞后度量。 设置为 false 时,仅使用定期计算的偏移量滞后。

默认值:true

spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval

计算每个消费者 topic 偏移量滞后的间隔。 当 metrics.defaultOffsetLagMetricsEnabled 被禁用或其计算时间过长时,将使用此值。

默认值:60 秒

spring.cloud.stream.kafka.binder.enableObservation

在此绑定器中的所有绑定上启用 Micrometer 观察注册表。

默认值:false

spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup

KafkaHealthIndicator 元数据消费者 group.id。 此消费者由 HealthIndicator 用于查询有关正在使用的 topic 的元数据。

默认值:无。

Kafka 消费者属性

以下属性仅适用于 Kafka 消费者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.consumer. 为前缀。

为避免重复,Spring Cloud Stream 支持以 spring.cloud.stream.kafka.default.consumer.<property>=<value> 的格式为所有通道设置值。

admin.configuration

从 2.1.1 版本开始,此属性已弃用,取而代之的是 topic.properties,并且在未来版本中将删除对其的支持。

admin.replicas-assignment

从 2.1.1 版本开始,此属性已弃用,取而代之的是 topic.replicas-assignment,并且在未来版本中将删除对其的支持。

admin.replication-factor

从 2.1.1 版本开始,此属性已弃用,取而代之的是 topic.replication-factor,并且在未来版本中将删除对其的支持。

autoRebalanceEnabled

true 时,topic 分区会在消费者组的成员之间自动重新平衡。 当 false 时,每个消费者根据 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 被分配一组固定的分区。 这要求在每个启动的实例上都适当地设置 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 属性。 在这种情况下,spring.cloud.stream.instanceCount 属性的值通常必须大于 1。

默认值:true

ackEachRecord

autoCommitOffsettrue 时,此设置决定是否在处理每个记录后提交偏移量。 默认情况下,在处理完 consumer.poll() 返回的批处理中的所有记录后,会提交偏移量。 通过消费者 configuration 属性设置的 max.poll.records Kafka 属性可以控制 poll 返回的记录数。 将其设置为 true 可能会导致性能下降,但这样做可以减少发生故障时重新传递记录的可能性。 另请参阅绑定器的 requiredAcks 属性,该属性也影响提交偏移量的性能。 此属性自 3.1 起已弃用,取而代之的是使用 ackMode。 如果未设置 ackMode 且未启用批处理模式,将使用 RECORD ackMode。

默认值:false

autoCommitOffset

从 3.1 版本开始,此属性已弃用。 有关替代方案的更多详细信息,请参阅 ackMode。 消息处理后是否自动提交偏移量。 如果设置为 false,则入站消息中存在一个键为 kafka_acknowledgment、类型为 org.springframework.kafka.support.Acknowledgment 的头。 应用程序可以使用此头来确认消息。 有关详细信息,请参阅示例部分。 当此属性设置为 false 时,Kafka 绑定器将 ack 模式设置为 org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,并且应用程序负责确认记录。 另请参阅 ackEachRecord

默认值:true

ackMode

指定容器确认模式。 这基于 Spring Kafka 中定义的 AckMode 枚举。 如果 ackEachRecord 属性设置为 true 且消费者未处于批处理模式,则将使用 RECORD 确认模式,否则,使用此属性提供的确认模式。

autoCommitOnError

在可轮询消费者中,如果设置为 true,则在出错时始终自动提交。 如果未设置(默认)或为 false,则在可轮询消费者中不会自动提交。 请注意,此属性仅适用于可轮询消费者。

默认值:未设置。

resetOffsets

是否将消费者上的偏移量重置为 startOffset 提供的值。 如果提供了 KafkaBindingRebalanceListener,则必须为 false;请参阅 rebalance listener 有关此属性的更多信息,请参阅 reset-offsets

默认值:false

startOffset

新组的起始偏移量。 允许值:earliestlatest。 如果为消费者“绑定”显式设置了消费者组(通过 spring.cloud.stream.bindings.<channelName>.group),则“startOffset”设置为 earliest。否则,对于 anonymous 消费者组,它设置为 latest。 有关此属性的更多信息,请参阅 reset-offsets

默认值:null(相当于 earliest)。

enableDlq

设置为 true 时,为消费者启用 DLQ 行为。 默认情况下,导致错误的消息会转发到名为 error.<destination>.<group> 的 topic。 DLQ topic 名称可以通过设置 dlqName 属性或定义 DlqDestinationResolver 类型的 @Bean 来配置。 这为更常见的 Kafka 重放场景提供了一个替代选项,适用于错误数量相对较小且重放整个原始 topic 可能过于繁琐的情况。 有关更多信息,请参阅 kafka dlq processing。 从 2.0 版本开始,发送到 DLQ topic 的消息会添加以下头:x-original-topicx-exception-messagex-exception-stacktrace 作为 byte[]。 默认情况下,失败的记录会发送到 DLQ topic 中与原始记录相同的分区号。 有关如何更改此行为,请参阅 dlq partition selectiondestinationIsPatterntrue 时不允许使用。

默认值:false

dlqPartitions

enableDlq 为 true 且未设置此属性时,会创建一个与主 topic 具有相同分区数的死信 topic。 通常,死信记录会发送到死信 topic 中与原始记录相同的分区。 此行为可以更改;请参阅 dlq partition selection。 如果此属性设置为 1 且没有 DqlPartitionFunction bean,则所有死信记录都将写入分区 0。 如果此属性大于 1,则 必须 提供 DlqPartitionFunction bean。 请注意,实际分区计数受绑定器的 minPartitionCount 属性影响。

默认值:none

configuration

包含通用 Kafka 消费者属性的键/值对映射。 除了 Kafka 消费者属性外,此处还可以传递其他配置属性。 例如,应用程序所需的一些属性,如 spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=barbootstrap.servers 属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。

默认值:空映射。

dlqName

接收错误消息的 DLQ topic 的名称。

默认值:null(如果未指定,导致错误的消息将转发到名为 error.<destination>.<group> 的 topic)。

dlqProducerProperties

使用此属性可以设置 DLQ 特定的生产者属性。 所有可通过 kafka 生产者属性获得的属性都可以通过此属性设置。 当消费者上启用原生解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。 这必须以 dlqProducerProperties.configuration.key.serializerdlqProducerProperties.configuration.value.serializer 的形式提供。

默认值:默认 Kafka 生产者属性。

standardHeaders

指示入站通道适配器填充哪些标准头。 允许值:noneidtimestampboth。 如果使用原生反序列化并且接收消息的第一个组件需要 id(例如配置为使用 JDBC 消息存储的聚合器),则很有用。

默认值:none

converterBeanName

实现 RecordMessageConverter 的 bean 名称。用于入站通道适配器以替换默认的 MessagingMessageConverter

默认值:null

idleEventInterval

事件之间的时间间隔(以毫秒为单位),指示最近没有收到消息。 使用 ApplicationListener<ListenerContainerIdleEvent> 来接收这些事件。 有关使用示例,请参阅 pause-resume

默认值:30000

destinationIsPattern

当为 true 时,目标被视为用于由代理匹配 topic 名称的正则表达式 Pattern。 当为 true 时,不提供 topic,并且不允许 enableDlq,因为绑定器在 provision 阶段不知道 topic 名称。 请注意,检测匹配模式的新 topic 所需的时间由消费者属性 metadata.max.age.ms 控制,该属性(在撰写本文时)默认为 300,000 毫秒(5 分钟)。 这可以使用上面的 configuration 属性进行配置。

默认值:false

topic.properties

Kafka topic 属性的 Map,用于 provision 新 topic——例如,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0

默认值:无。

topic.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,其中键是分区,值是分配。 用于 provision 新 topic。 请参阅 kafka-clients jar 中的 NewTopic Javadoc。

默认值:无。

topic.replication-factor

provision topic 时使用的复制因子。覆盖绑定器范围的设置。 如果存在 replicas-assignments,则忽略。

默认值:无(使用绑定器范围的默认值 -1)。

pollTimeout

可轮询消费者中用于轮询的超时时间。

默认值:5 秒。

transactionManager

KafkaAwareTransactionManager 的 bean 名称,用于覆盖此绑定的绑定器事务管理器。 通常需要它,如果您想使用 ChainedKafkaTransactionManaager 将另一个事务与 Kafka 事务同步。 为了实现记录的精确一次消费和生产,消费者和生产者绑定都必须配置相同的事务管理器。

默认值:无。

txCommitRecovered

当使用事务绑定器时,恢复记录的偏移量(例如,当重试耗尽且记录发送到死信 topic 时)将通过新事务提交,默认情况下。 将此属性设置为 false 可抑制提交恢复记录的偏移量。

默认值:true。

commonErrorHandlerBeanName

每个消费者绑定使用的 CommonErrorHandler bean 名称。 当存在时,此用户提供的 CommonErrorHandler 优先于绑定器定义的任何其他错误处理程序。 这是一种方便表达错误处理程序的方式,如果应用程序不想使用 ListenerContainerCustomizer 然后检查目标/组组合来设置错误处理程序。

默认值:无。

Kafka 生产者属性

以下属性仅适用于 Kafka 生产者,并且 必须以 spring.cloud.stream.kafka.bindings.<channelName>.producer. 为前缀。

为避免重复,Spring Cloud Stream 支持以 spring.cloud.stream.kafka.default.producer.<property>=<value> 的格式为所有通道设置值。

admin.configuration

从 2.1.1 版本开始,此属性已弃用,取而代之的是 topic.properties,并且在未来版本中将删除对其的支持。

admin.replicas-assignment

从 2.1.1 版本开始,此属性已弃用,取而代之的是 topic.replicas-assignment,并且在未来版本中将删除对其的支持。

admin.replication-factor

从 2.1.1 版本开始,此属性已弃用,取而代之的是 topic.replication-factor,并且在未来版本中将删除对其的支持。

bufferSize

Kafka 生产者在发送前尝试批量处理的数据上限(以字节为单位)。

默认值:16384

sync

生产者是否同步。

默认值:false

sendTimeoutExpression

一个 SpEL 表达式,针对出站消息进行评估,用于评估启用同步发布时等待 ack 的时间——例如,headers['mySendTimeout']。 超时值以毫秒为单位。 在 3.0 之前的版本中,除非使用原生编码,否则不能使用 payload,因为在评估此表达式时,payload 已经采用 byte[] 的形式。 现在,在转换 payload 之前评估表达式。

默认值:none

batchTimeout

生产者等待允许更多消息在同一批次中累积然后发送消息的时间。 (通常,生产者根本不会等待,而只是发送在上次发送进行时累积的所有消息。)非零值可能会以延迟为代价提高吞吐量。

默认值:0

messageKeyExpression

一个 SpEL 表达式,针对出站消息进行评估,用于填充生成的 Kafka 消息的键——例如,headers['myKey']。 在 3.0 之前的版本中,除非使用原生编码,否则不能使用 payload,因为在评估此表达式时,payload 已经采用 byte[] 的形式。 现在,在转换 payload 之前评估表达式。 对于常规处理器(Function<String, String>Function<Message<?>, Message<?>>),如果生成的键需要与来自 topic 的入站键相同,则此属性可以按如下设置。 spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey'] 对于响应式函数,有一个重要的注意事项。 在这种情况下,应用程序需要手动将头从入站消息复制到出站消息。 您可以设置头,例如 myKey 并使用 headers['myKey'],如上所述,或者,为了方便起见,只需设置 KafkaHeaders.MESSAGE_KEY 头,您根本不需要设置此属性。

默认值:none

headerPatterns

一个逗号分隔的简单模式列表,用于匹配要映射到 ProducerRecord 中的 Kafka Headers 的 Spring 消息头。 模式可以以通配符(星号)开头或结尾。 模式可以通过前缀 ! 来否定。 匹配在第一次匹配(肯定或否定)后停止。 例如,!ask,as* 将通过 ash 而不是 askidtimestamp 从不映射。

默认值:*(所有头——除了 idtimestamp

configuration

包含通用 Kafka 生产者属性的键/值对映射。 bootstrap.servers 属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。

默认值:空映射。

topic.properties

Kafka topic 属性的 Map,用于 provision 新 topic——例如,spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0

topic.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,其中键是分区,值是分配。 用于 provision 新 topic。 请参阅 kafka-clients jar 中的 NewTopic Javadoc。

默认值:无。

topic.replication-factor

provision topic 时使用的复制因子。覆盖绑定器范围的设置。 如果存在 replicas-assignments,则忽略。

默认值:无(使用绑定器范围的默认值 -1)。

useTopicHeader

设置为 true 以使用出站消息中 KafkaHeaders.TOPIC 消息头的值覆盖默认的绑定目标(topic 名称)。 如果头不存在,则使用默认的绑定目标。

默认值:false

recordMetadataChannel

MessageChannel 的 bean 名称,用于发送成功的发送结果;该 bean 必须存在于应用程序上下文中。 发送到通道的消息是已发送的消息(如果进行了转换),并带有一个额外的头 KafkaHeaders.RECORD_METADATA。 该头包含 Kafka 客户端提供的 RecordMetadata 对象;它包括记录在 topic 中写入的分区和偏移量。

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class) 失败的发送将发送到生产者错误通道(如果已配置);请参阅 Kafka 错误通道。 默认值:null。

Kafka 绑定器使用生产者的 partitionCount 设置作为提示,以给定分区计数创建 topic(结合 minPartitionCount,使用两者中的最大值)。 在为绑定器配置 minPartitionCount 和为应用程序配置 partitionCount 时要谨慎,因为将使用较大的值。 如果已存在分区计数较小的 topic 且 autoAddPartitions 被禁用(默认),绑定器将无法启动。 如果已存在分区计数较小的 topic 且 autoAddPartitions 被启用,则会添加新分区。 如果已存在分区数量大于 (minPartitionCountpartitionCount) 最大值的 topic,则使用现有分区计数。

compression

设置 compression.type 生产者属性。 支持的值为 nonegzipsnappylz4zstd。 如果您将 kafka-clients jar 覆盖为 2.1.0(或更高版本),如 Spring for Apache Kafka documentation 中所述,并且希望使用 zstd 压缩,请使用 spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd

默认值:none

transactionManager

KafkaAwareTransactionManager 的 bean 名称,用于覆盖此绑定的绑定器事务管理器。 通常需要它,如果您想使用 ChainedKafkaTransactionManaager 将另一个事务与 Kafka 事务同步。 为了实现记录的精确一次消费和生产,消费者和生产者绑定都必须配置相同的事务管理器。

默认值:无。

closeTimeout

关闭生产者时等待的超时时间(以秒为单位)。

默认值:30

allowNonTransactional

通常,与事务绑定器关联的所有输出绑定都将在新事务中发布,如果尚未进行事务。 此属性允许您覆盖该行为。 如果设置为 true,则发布到此输出绑定的记录将不会在事务中运行,除非事务已在进行中。

默认值:false