消费批次
从 3.0 版本开始,当 spring.cloud.stream.bindings.<name>.consumer.batch-mode
设置为 true
时,通过轮询 Kafka Consumer
接收到的所有记录将以 List<?>
的形式呈现给监听器方法。
否则,该方法将一次接收一条记录。
批次大小由 Kafka 消费者属性 max.poll.records
、fetch.min.bytes
、fetch.max.wait.ms
控制;有关更多信息,请参阅 Kafka 文档。
接收批次时,允许使用以下类型签名:
List<Person>
Message<List<Person>>
在 List<Person>
的第一个选项中,监听器将不会获得任何消息头。
如果使用第二个类型签名 (Message<List<Person>>
),则可以访问消息头;但是,所有消息头仍然以 Collection
的形式存在。
让我们看以下示例。
假设 Message
包含一个包含十个 Person
对象的列表。
Message
的 MessageHeaders
包含一个消息头映射,其中键是消息头名称,值是列表。
此列表包含该消息头的值,其顺序与有效负载列表相同。
因此,应用程序需要根据有效负载列表的迭代,从 MessageHeaders
映射中正确访问消息头。
请注意,在批处理模式下消费时,不允许使用 List<Message<Person>>
形式的类型签名。
从 4.0.2
版本开始,绑定器在批处理模式下消费时支持 DLQ 功能。
请记住,当在批处理模式下对消费者绑定使用 DLQ 时,从上一次轮询接收到的所有记录都将传递到 DLQ 主题。
当使用批处理模式时,绑定器内的重试不受支持,因此 maxAttempts
将被覆盖为 1。
您可以配置 DefaultErrorHandler
(使用 ListenerContainerCustomizer
)来实现与绑定器中重试类似的功能。
您还可以使用手动 AckMode
并调用 Ackowledgment.nack(index, sleep)
来提交部分批次的偏移量,并使剩余记录重新投递。
有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档。
当在批处理模式下接收 |
批处理模式下消费时的可观测性
在批处理模式下消费记录时,不支持直接的观察跟踪传播功能。
这是因为 Kafka 绑定器使用的 Spring for Apache Kafka 库不支持批处理监听器上的跟踪;它仅支持记录监听器。
在批处理监听器中,接收到的记录可能来自多个主题/分区和多个生产者,其中添加跟踪信息是可选的。
由于批次中的记录之间可能没有任何关联,因此框架无法对它们进行任何跟踪假设,例如将它们作为单个跟踪 ID 提供等。
如果您使用 Message<List<String>>
的类型签名,则可以获得一个名为 kafka_batchConvertedHeaders
的消息头,它包含一个与有效负载条目数量相同的列表。
此列表包含一个 Map
,其中包含跟踪消息头。
但是,应用程序需要正确迭代此列表并启动观察。