消费批次

从 3.0 版本开始,当 spring.cloud.stream.bindings.<name>.consumer.batch-mode 设置为 true 时,通过轮询 Kafka Consumer 接收到的所有记录将以 List<?> 的形式呈现给监听器方法。 否则,该方法将一次接收一条记录。 批次大小由 Kafka 消费者属性 max.poll.recordsfetch.min.bytesfetch.max.wait.ms 控制;有关更多信息,请参阅 Kafka 文档。 接收批次时,允许使用以下类型签名:

List<Person>
Message<List<Person>>

List<Person> 的第一个选项中,监听器将不会获得任何消息头。 如果使用第二个类型签名 (Message<List<Person>>),则可以访问消息头;但是,所有消息头仍然以 Collection 的形式存在。 让我们看以下示例。 假设 Message 包含一个包含十个 Person 对象的列表。 MessageMessageHeaders 包含一个消息头映射,其中键是消息头名称,值是列表。 此列表包含该消息头的值,其顺序与有效负载列表相同。 因此,应用程序需要根据有效负载列表的迭代,从 MessageHeaders 映射中正确访问消息头。 请注意,在批处理模式下消费时,不允许使用 List<Message<Person>> 形式的类型签名。 从 4.0.2 版本开始,绑定器在批处理模式下消费时支持 DLQ 功能。 请记住,当在批处理模式下对消费者绑定使用 DLQ 时,从上一次轮询接收到的所有记录都将传递到 DLQ 主题。

当使用批处理模式时,绑定器内的重试不受支持,因此 maxAttempts 将被覆盖为 1。 您可以配置 DefaultErrorHandler(使用 ListenerContainerCustomizer)来实现与绑定器中重试类似的功能。 您还可以使用手动 AckMode 并调用 Ackowledgment.nack(index, sleep) 来提交部分批次的偏移量,并使剩余记录重新投递。 有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档

当在批处理模式下接收 KafkaNull 对象时,接收到的列表将包含一个对应 KafkaNull 对象的 null 元素。 这对于 List<Person>Message<List<Person>> 样式类型签名都适用。

批处理模式下消费时的可观测性

在批处理模式下消费记录时,不支持直接的观察跟踪传播功能。 这是因为 Kafka 绑定器使用的 Spring for Apache Kafka 库不支持批处理监听器上的跟踪;它仅支持记录监听器。 在批处理监听器中,接收到的记录可能来自多个主题/分区和多个生产者,其中添加跟踪信息是可选的。 由于批次中的记录之间可能没有任何关联,因此框架无法对它们进行任何跟踪假设,例如将它们作为单个跟踪 ID 提供等。 如果您使用 Message<List<String>> 的类型签名,则可以获得一个名为 kafka_batchConvertedHeaders 的消息头,它包含一个与有效负载条目数量相同的列表。 此列表包含一个 Map,其中包含跟踪消息头。 但是,应用程序需要正确迭代此列表并启动观察。