Handling Exceptions
此部分介绍如何处理在使用 Spring for Apache Kafka 时可能出现的各种异常。
Listener Error Handlers
从版本 2.0 开始,@KafkaListener 注解具有一个新属性:errorHandler。
可以使用 errorHandler 来提供 KafkaListenerErrorHandler 实现的 bean 名称。此函数式接口有一个方法,如下面清单所示:
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
你可以访问由消息转换器生成的 Spring 消息传递 Message<?> 对象和监听器抛出的异常,该异常被包装在 ListenerExecutionFailedException 中。错误处理程序可以抛出原始或新异常,并将其抛出给容器。错误处理程序返回的任何内容都将被忽略。
从版本 2.7 开始,可以在 MessagingMessageConverter 和 BatchMessagingMessageConverter 上设置 rawRecordHeader 属性,这会导致在转换后的 KafkaHeaders.RAW_DATA 头中的 Message<?> 中添加原始 ConsumerRecord。例如,如果你希望在监听器错误处理程序中使用 DeadLetterPublishingRecoverer,这将非常有用。在请求/响应场景中,它可能用于在将失败记录捕获到死信主题并进行多次重试后,将失败结果发送给发送者。
@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一个子接口 (ConsumerAwareListenerErrorHandler),可以通过以下方法访问使用者对象:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
另一个子接口 (ManualAckListenerErrorHandler) 提供在使用手动 AckMode 时访问 Acknowledgment 对象的权限。
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
不管哪种情况,你都不应该对使用者执行任何查找,因为容器不会意识到这一点。
Container Error Handlers
从版本 2.8 开始,传统的 ErrorHandler 和 BatchErrorHandler 接口已由新的 CommonErrorHandler 取代。这些错误处理程序可以处理记录和批处理监听器的错误,从而允许单个监听器容器工厂为两种类型的监听器创建容器。提供了 CommonErrorHandler 实现以替换大多数传统框架错误处理程序实现。
关于将自定义错误处理程序迁移到 CommonErrorHandler 的信息,请参阅 Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler。
在使用事务时,默认情况下未配置错误处理程序,以便异常将回滚事务。事务性容器的错误处理由 AfterRollbackProcessor 处理。如果你在使用事务时提供了自定义错误处理程序,则它必须抛出异常,以便事务回滚。
此接口具有一个默认方法 isAckAfterHandle(),容器调用该方法以确定在错误处理程序未引发异常情况返回时是否提交偏移;默认情况下返回 true。
通常,当错误“未处理”时(如执行搜索操作后),框架提供的错误处理程序会引发异常。默认情况下,容器在 ERROR 级别记录此类异常。所有框架错误处理程序都扩展 KafkaExceptionLogLevelAware ,这样可以控制记录此类异常的级别。
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
可以指定一个全局错误处理程序,用于容器工厂中的所有侦听器。下面的示例展示了如何做到这一点:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
默认情况下,如果带注释的侦听器方法引发异常,则会引发异常至容器,并根据容器配置来处理消息。
容器在调用错误处理程序前会提交所有待处理的偏移提交。
如果使用 Spring Boot,只需要将错误处理程序作为 @Bean 添加,Boot 就会将其添加到自动配置的工厂中。
Back Off Handlers
DefaultErrorHandler 等错误处理程序使用 BackOff 来确定重试传递之前等待多长时间。从版本 2.9 开始,你可以配置自定义 BackOffHandler。默认处理程序只是将线程挂起,直到回退时间过去(或容器停止)。框架还提供了 ContainerPausingBackOffHandler,它将暂停监听器容器,直到回退时间过去,然后恢复容器。当延迟长于 max.poll.interval.ms 消费者属性时,这很有用。请注意,实际回退时间的解析将受到 pollTimeout 容器属性的影响。
DefaultErrorHandler
这个新的错误处理程序替换了 SeekToCurrentErrorHandler 和 RecoveringBatchErrorHandler,它们一直是几个发行版中的默认错误处理程序。一种区别在于,批量监听器的回退行为(当抛出 BatchListenerFailedException 之外的异常时)相当于 Retrying Complete Batches。
从版本 2.9 开始,DefaultErrorHandler 可以配置为提供与查找未处理记录偏移量相同的语义,如下所述,但实际上不进行查找。相反,这些记录由侦听器容器保留,并在错误处理程序退出(并执行一次暂停的 poll() 之后重新提交给侦听器,以使使用者保持活动状态;如果正在使用 Non-Blocking Retries 或 ContainerPausingBackOffHandler ,暂停可能会延长到多次轮询)。错误处理程序会返回一个结果给容器,指示当前失败的记录是否可以重新提交,或者是否已经恢复,然后不会再次发送给侦听器。要启用此模式,请将属性 seekAfterError 设置为 false。
错误处理程序可以恢复(跳过)不断失败的记录。默认情况下,在十次失败后,会记录(ERROR 级别的)失败记录。可以使用自定义恢复器 (BiConsumer) 和控制每次传递尝试和延迟时间的 BackOff 来配置处理程序。使用具有 FixedBackOff.UNLIMITED_ATTEMPTS 的 FixedBackOff 会(实际上)产生无限次重试。下列示例配置在尝试三次后的恢复:
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
要使用此处理程序的自定义实例配置侦听器容器,请将它添加到容器工厂。
例如,对于 @KafkaListener 容器工厂,可以如下添加 DefaultErrorHandler:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
对于记录侦听器,这会用 1 秒的退避重试发送最多 2 次(共 3 次发送尝试),而不是默认配置(FixedBackOff(0L, 9))。重试次数用尽后,仅记录失败。
例如,如果 poll 返回六条记录(来自每个分区 0、1、2 的两条),而侦听器在第四条记录上引发异常,容器通过提交偏移来确认前三个消息。DefaultErrorHandler 查找分区 1 的偏移 1 和分区 2 的偏移 0。下一个 poll() 返回未处理的三条记录。
如果 AckMode 是 BATCH,容器会在调用错误处理程序前提交前两个分区的偏移。
对于批处理侦听器,侦听器必须引发 BatchListenerFailedException,以指示批处理中哪个记录失败。
事件顺序为:
-
提交记录的偏移量, 在索引之前。
-
如果重试未耗尽, 则执行寻址, 以便重新传递所有剩余记录(包括失败的记录)。
-
如果重试已耗尽, 则尝试恢复失败的记录(默认仅记录)并执行寻址, 以便重新传递剩余记录(排除失败的记录)。已恢复的记录的偏移量已提交。
-
如果重试已耗尽且恢复失败, 则执行寻址, 就像没有用尽重试一样。
从版本 2.9 开始,可以将 DefaultErrorHandler 配置为提供与以上提及的搜索未处理记录偏移量相同的语义,但实际不进行搜索。相反,错误处理程序创建一个新的 ConsumerRecords<?, ?>,其中仅包含未处理记录,然后将该记录提交至监听器(在执行一次暂停的 poll() 以保持使用者存活之后)。要启用此模式,请将属性 seekAfterError 设置为 false。
默认恢复器会在重试用尽后记录失败记录。您可以使用自定义恢复器,或框架提供的恢复器,如 xref:kafka/annotation-error-handling.adoc#dead-letters[DeadLetterPublishingRecoverer。
当使用 POJO 批处理侦听器(如 List<Thing>)时,如果没有完整的消费者记录可以添加到异常中,只需添加失败记录的索引即可:
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < records.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
当容器配置为 AckMode.MANUAL_IMMEDIATE 时,可以配置错误处理程序来提交已恢复记录的偏移;将 commitRecovered 属性设置为 true。
在使用事务时,DefaultAfterRollbackProcessor 提供类似的功能。请参阅 After-rollback Processor。
DefaultErrorHandler 将某些异常视为致命异常,并且对此类异常跳过重试;恢复程序在首次失败时调用。默认情况下,被视为致命的异常有:
-
DeserializationException -
MessageConversionException -
ConversionException -
MethodArgumentResolutionException -
NoSuchMethodException -
ClassCastException
因为这些异常不太可能在重试传递时得到解决。
可以将更多异常类型添加到不可重试类别,或完全替换分类的异常映射。请参阅 DefaultErrorHandler.addNotRetryableException() 和 DefaultErrorHandler.setClassifications() 的 Javadocs 了解更多信息,以及适用于 spring-retry BinaryExceptionClassifier 的信息。
以下示例向不可重试异常添加了 IllegalArgumentException:
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
错误处理程序可以通过一个或多个 RetryListener 进行配置,以接收重试和恢复进度的通知。从版本 2.8.10 开始,添加了批处理侦听器的方法。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
有关更多信息,请参见 JavaDoc。
如果恢复程序失败(抛出异常),则失败的记录将包含在 seeks 中。如果恢复程序失败,BackOff 将按默认值重置,而且在再次尝试恢复之前,重新交付将再次经历 back offs。要跳过恢复失败后的重试,将错误处理程序的 resetStateOnRecoveryFailure 设置为 false。
可以给错误处理程序提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,用于基于失败的记录和/或异常来确定要使用的 BackOff:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回 null,则将使用处理程序的默认 BackOff。
将 resetStateOnExceptionChange 设置为 true,如果失败之间的异常类型发生更改,则将重新启动重试序列(包括选择新的 BackOff(如果已配置))。当为 false(2.9 版本之前的默认值)时,异常类型不被考虑。
从 2.9 版本开始,现在默认为 true。
另请参阅 Delivery Attempts Header。
Conversion Errors with Batch Error Handlers
从 2.8 版本开始,批处理侦听器现在可以正确处理转换错误,当使用具有 ByteArrayDeserializer、BytesDeserializer 或 StringDeserializer 的 MessageConverter 及 DefaultErrorHandler 时。当发生转换错误时,有效负载将设置为 null,并且记录头中会添加反序列化异常,类似于 ErrorHandlingDeserializer。ConversionException 列表在侦听器中可用,以便侦听器可以抛出 BatchListenerFailedException,以指示出现转换异常的第一个索引。
示例:
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
Retrying Complete Batches
这现在是 FallbackBatchErrorHandler 的后备行为,其中批处理侦听器抛出不是 BatchListenerFailedException 的异常。
当批处理重新传递时,不能保证批处理具有相同数量的记录和/或重新传递的记录按相同顺序排列。因此,很难轻松维护批处理的重试状态。FallbackBatchErrorHandler 采用以下方法。如果批处理侦听器抛出一个不是 BatchListenerFailedException 的异常,则从记录的内存中批处理执行重试。为了避免在扩展的重试序列期间重新平衡,错误处理程序暂停使用者,在每次重试之前进行轮询,然后休眠以进行回退,并再次调用侦听器。如果/当重试耗尽时,将针对批处理中的每个记录调用 ConsumerRecordRecoverer。如果恢复程序抛出一个异常,或者在睡眠期间线程被打断,则将在下次轮询时重新传递批处理记录。无论结果如何,在退出之前,都会恢复使用者。
此机制不能与事务一起使用。
在等待 BackOff 间隔时,错误处理程序将以短时间睡眠循环,直至达到所需的延迟,同时检查容器是否已停止,允许在 stop() 后立即退出睡眠,而不是导致延迟。
Container Stopping Error Handlers
如果侦听器抛出异常,CommonContainerStoppingErrorHandler 将停止容器。对于记录侦听器,当 AckMode 为 RECORD 时,将提交已处理记录的偏移量。对于记录侦听器,当 AckMode 为任何手动值时,将提交已确认记录的偏移量。对于记录侦听器,当 AckMode 为 BATCH,或对于批处理侦听器,当容器重新启动时,将重放整个批处理。
在容器停止后,将抛出一个包含 ListenerExecutionFailedException 的异常。这是为了使事务回滚(如果启用了事务)。
Delegating Error Handler
CommonDelegatingErrorHandler 可以委派给不同的错误处理程序,具体取决于异常类型。例如,你可能希望对大多数异常调用 DefaultErrorHandler,或对其他异常调用 CommonContainerStoppingErrorHandler。
所有委托都必须共享相同的兼容属性(ackAfterHandle、seekAfterError …)。
Logging Error Handler
CommonLoggingErrorHandler 只会记录异常;对于记录监听器,上个轮训中的剩余记录会传递给监听器。对于批处理监听器,批处理中的所有记录都会被记录。
Using Different Common Error Handlers for Record and Batch Listeners
如果你希望对记录和批处理监听器使用不同的错误处理策略,会提供 CommonMixedErrorHandler 来允许为每个监听器类型配置特定的错误处理程序。
Common Error Handler Summary
-
DefaultErrorHandler -
CommonContainerStoppingErrorHandler -
CommonDelegatingErrorHandler -
CommonLoggingErrorHandler -
CommonMixedErrorHandler
Legacy Error Handlers and Their Replacements
| Legacy Error Handler | Replacement |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
无替换,使用带无穷大 |
|
|
|
无替换,使用 |
Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler
请参阅 CommonErrorHandler 中的 JavaDocs。
要代替 ErrorHandler 或 ConsumerAwareErrorHandler 实现,你应实现 handleOne(),并让 seeksAfterHandle() 返回 false(默认值)。你还应实现 handleOtherException() 来处理发生在记录处理范围之外的异常(例如消费者错误)。
要代替 RemainingRecordsErrorHandler 实现,你应实现 handleRemaining(),并覆盖 seeksAfterHandle() 以返回 true(错误处理程序必须执行必要的搜索)。你还应实现 handleOtherException() 来处理发生在记录处理范围之外的异常(例如消费者错误)。
要代替任何 BatchErrorHandler 实现,你应实现 handleBatch()。你还应实现 handleOtherException() 来处理发生在记录处理范围之外的异常(例如消费者错误)。
After Rollback Processor
使用事务时,如果监听器抛出一个异常(且存在错误处理程序时,该处理程序抛出异常),则会回滚该事务。默认情况下,任何未处理的记录(包括失败的记录)都会在下一个轮询中重新抓取。这通过在 DefaultAfterRollbackProcessor 中执行 seek 操作来实现。对于批处理监听器,会重新处理记录的整个批处理(该容器并不知道批处理中哪个记录失败了)。若要修改此行为,你可以使用自定义 AfterRollbackProcessor 配置监听器容器。例如,对于基于记录的监听器,你可能希望跟踪失败的记录,并在多次尝试后放弃,也许通过将该记录发布到死信主题。
从 2.2 版本开始,DefaultAfterRollbackProcessor 现在可以恢复(跳过)不断失败的记录。默认情况下,经过十次失败后,会记录失败的记录(在“ERROR”级别)。你可以使用自定义恢复程序(BiConsumer)和最大失败数来配置处理器。将 maxFailures 属性设置为负数会导致无限重试。以下示例配置三次尝试后的恢复:
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
在不使用事务时,可以通过配置 DefaultErrorHandler 实现类似的功能。请参阅 Container Error Handlers。
从 3.2 版本开始,恢复现在可以恢复(跳过)不断失败的整个批处理记录。设置 ContainerProperties.setBatchRecoverAfterRollback(true) 以启用此功能。
默认行为,恢复无法与批处理监听器一起使用,因为框架不知道批处理中哪条记录会一直失败。在这种情况下,应用程序监听器必须处理一直失败的记录。
从 2.2.5 版本开始,DefaultAfterRollbackProcessor 可以在一个新事务中调用(在失败事务回滚后启动)。然后,如果你使用 DeadLetterPublishingRecoverer 将失败的记录发布,则处理器将恢复记录的偏移量发送到事务中的原始主题/分区。要启用此功能,请在 DefaultAfterRollbackProcessor 上设置 commitRecovered 和 kafkaTemplate 属性。
如果恢复程序失败(抛出异常),则失败的记录将包含在 seeks 中。从 2.5.5 版本开始,如果恢复程序失败,BackOff 将按默认值重置,而且在再次尝试恢复之前,重新交付将再次经历 back offs。在较早版本中,BackOff 没有重置,并且在下次失败时重新尝试恢复。要恢复到以前的行为,将处理程序的 resetStateOnRecoveryFailure 属性设置为 false。
从 2.6 版本开始,你现在可以向处理器提供 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以基于失败的记录和/或异常来确定要使用的 BackOff:
handler.setBackOffFunction((record, ex) -> { ... });
如果该函数返回 null,则将使用处理器的默认 BackOff。
从 2.6.3 版本开始,将 resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生改变,重试序列将重新启动(包括选择一个新的 BackOff,如果这样配置的话)。默认情况下,不考虑异常类型。
从 2.3.1 版本开始,与 DefaultErrorHandler 类似,DefaultAfterRollbackProcessor 将某些异常视为致命异常,并且对这些异常不进行重试;恢复程序在第一次失败时调用。默认情况下,被认为是致命的异常有:
-
DeserializationException -
MessageConversionException -
ConversionException -
MethodArgumentResolutionException -
NoSuchMethodException -
ClassCastException
因为这些异常不太可能在重试传递时得到解决。
你可以在不可重试类别中添加更多异常类型,或者完全替换已分类异常的地图。有关更多信息,请参阅 DefaultAfterRollbackProcessor.setClassifications() 和 spring-retry BinaryExceptionClassifier 的 Javadoc。
以下示例向不可重试异常添加了 IllegalArgumentException:
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
另请参阅 Delivery Attempts Header。
使用当前 kafka-clients 时,容器无法检测出 ProducerFencedException 是由负载平衡引起的,还是生产者的 transactional.id 由于超时或失效而被撤销。因为在大多数情况下,此类问题由负载平衡引起,所以容器不会调用 AfterRollbackProcessor(因为它不是适当的,因为我们不再分配给它们分区)。如果您确保超时时间足够每个事务处理,并且定期执行“空”事务(例如通过 ListenerContainerIdleEvent),您可以避免因超时和失效而隔离。或者,您可以将 stopContainerWhenFenced 容器属性设置为 true,而且容器将停止,避免记录丢失。您可以消费 ConsumerStoppedEvent 并检查 Reason 属性是否为 FENCED 以检测此条件。由于该事件还引用了容器,因此您可以使用此事件重新启动容器。
从 2.7 版本开始,错误处理程序会在等待 BackOff 间隔时循环进行短暂休眠,直到达到所需的延迟,同时检查容器是否已停止,从而允许休眠在 stop() 后尽快退出,而不是造成延迟。
从 2.7 版本开始,可以为处理器配置一个或多个 RetryListener,接收重试和恢复进度的通知。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
有关更多信息,请参见 JavaDoc。
Delivery Attempts Header
以下只适用于记录监听器,不适用于批处理监听器。
从版本 2.5 开始,在使用实现 DeliveryAttemptAware 的 ErrorHandler 或 AfterRollbackProcessor 时,可以启用向记录中添加 KafkaHeaders.DELIVERY_ATTEMPT 头 (kafka_deliveryAttempt)。此头的值为从 1 开始的递增整数。在收到原始 ConsumerRecord<?, ?> 时,整数位于 byte[4] 中。
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt();
使用带有 DefaultKafkaHeaderMapper 或 SimpleKafkaHeaderMapper 的 @KafkaListener 时,可通过将 @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery 作为参数添加到监听器方法中来获取。
要启用此标头的填充,请将容器属性 deliveryAttemptHeader 设置为 true。它在默认情况下被禁用,以避免查找每个记录的状态并添加标头的(小)开销。
DefaultErrorHandler 和 DefaultAfterRollbackProcessor 支持此功能。
Listener Info Header
在某些情况下,了解监听器在哪个容器中运行很有用。
从版本 2.8.4 开始,您现在可以在监听器容器上设置 listenerInfo 属性,或在 @KafkaListener 注释上设置 info 属性。然后,容器会将此信息添加到所有传入消息的 KafkaListener.LISTENER_INFO 标头中;然后可以在记录拦截器、筛选器等中或在监听器本身中使用它。
@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
在 RecordInterceptor 或 RecordFilterStrategy 实现中使用时,标头在消费者记录中以字节数组的形式出现,使用 KafkaListenerAnnotationBeanPostProcessor 的 charSet 属性进行转换。
标头映射器在从消费者记录创建 MessageHeaders 时也会转换成 String,且永远不会在此标头上映射输出记录。
对于 POJO 批量侦听器,从版本 2.8.6 开始,标头将被复制到批量的每个成员中,并且在转换后还可用作单个 String 参数。
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
|
如果批处理监听器有过滤器,并且筛选导致批处理为空,则您需要将 |
如果您收到 List<Message<Thing>>,则信息位于每个 Message<?> 的 KafkaHeaders.LISTENER_INFO 标头中。
见 Batch Listeners 了解更多关于消耗批次的信息。
Publishing Dead-letter Records
当记录的最大失败次数达到时,您可以使用记录回收器配置 DefaultErrorHandler 和 DefaultAfterRollbackProcessor。该框架提供 DeadLetterPublishingRecoverer,该功能将失败的消息发布到另一个主题。回收器需要一个 KafkaTemplate<Object, Object>,它用于发送记录。您还可以选择使用 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> 配置它,它用于解析目标主题和分区。
默认情况下,死信记录会发送到名为 <originalTopic>.DLT 的主题(原始主题名加上 .DLT 后缀),并且属于与原始记录相同的分区。因此,当您使用默认解析器时,死信主题 must have at least as many partitions as the original topic.
如果返回的 TopicPartition 具有负分区,则该分区不会在 ProducerRecord 中设置,因此该分区由 Kafka 选择。从版本 2.2.4 开始,任何 ListenerExecutionFailedException(例如,在 @KafkaListener 方法中检测到异常时抛出)都会通过 groupId 属性得到增强。这允许目标解析器使用此属性,除了 ConsumerRecord 中的信息外,还可用于选择死信主题。
以下示例说明如何连接自定义目标解析器:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
发送到死信主题的记录通过以下标头进行增强:
-
KafkaHeaders.DLT_EXCEPTION_FQCN: 异常类名(通常为ListenerExecutionFailedException, 但可以是其他名称)。 -
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN: 异常原因类名(如果存在)(从版本 2.8 开始)。 -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE: 异常堆栈跟踪。 -
KafkaHeaders.DLT_EXCEPTION_MESSAGE: The Exception message. -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN: 异常类名(仅密钥反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: 异常堆栈跟踪(仅密钥反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: 异常消息(仅密钥反序列化错误)。 -
KafkaHeaders.DLT_ORIGINAL_TOPIC: The original topic. -
KafkaHeaders.DLT_ORIGINAL_PARTITION: The original partition. -
KafkaHeaders.DLT_ORIGINAL_OFFSET: The original offset. -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: The original timestamp. -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: 原始时间戳类型。 -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP: 未能处理记录的早期消费者组(自 2.8 版本起)。
关键异常仅由 DeserializationException 引起,因此没有 DLT_KEY_EXCEPTION_CAUSE_FQCN。
有两种机制可以添加更多标头。
-
对恢复程序进行子类化,然后重写
createProducerRecord()- 调用 `super.createProducerRecord()`并添加更多标题。 -
提供一个
BiFunction`以接收消费者记录和异常,返回一个 `Headers`对象;来自那里的标题将被复制到最终生产者记录中;另请参阅 Managing Dead Letter Record Headers。使用 `setHeadersFunction()`设置 `BiFunction。
第二种机制实现起来更简单,但第一种机制有更多信息可用,包括已经组装好的标准标头。
从版本 2.3 开始,当与 ErrorHandlingDeserializer 结合使用时,发布者将在死信生产者记录中将记录 value() 还原为无法反序列化的原始值。以前,value() 为 null,用户代码必须从消息头中解码 DeserializationException。此外,您可以从 DeserializationException 中提供多个 KafkaTemplate`s to the publisher; this might be needed, for example, if you want to publish the `byte[],以及使用与成功反序列化的记录不同的序列化程序的值。以下是如何使用 KafkaTemplate`s that use a `String 和 byte[] 序列化程序配置发布者的示例:
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
发布者使用映射键定位适合用于即将发布的 value() 的模板。建议使用 LinkedHashMap,以便按顺序检查键。
在发布 null 值时,并且有多个模板,则回收器将查找适用于 Void 类的模板;如果不存在,则将使用 values().iterator() 中的第一个模板。
自 2.7 版开始,你可以使用 setFailIfSendResultIsError 方法,以便在消息发布失败时引发异常。你还可以使用 setWaitForSendResultTimeout 为发送者成功的验证设置超时。
如果恢复程序失败(抛出异常),则失败的记录将包含在 seeks 中。从 2.5.5 版本开始,如果恢复程序失败,BackOff 将按默认值重置,而且在再次尝试恢复之前,重新交付将再次经历 back offs。在较早版本中,BackOff 没有重置,并且在下次失败时重新尝试恢复。要恢复到以前的行为,将错误处理程序的 resetStateOnRecoveryFailure 属性设置为 false。
从 2.6.3 版本开始,将 resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生改变,重试序列将重新启动(包括选择一个新的 BackOff,如果这样配置的话)。默认情况下,不考虑异常类型。
从 2.3 版开始,恢复程序也可以与 Kafka 流一起使用 - 见 Recovery from Deserialization Exceptions 了解更多信息。
ErrorHandlingDeserializer 将反序列化异常添加到头信息 ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER 和 ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER(使用 Java 序列化)。默认情况下,这些头信息不会保留在发布到死信主题的消息中。从 2.7 版开始,如果密钥和值都反序列化失败,那么将这两个的原始值填充到发送到 DLT 的记录中。
如果传入记录相互依赖,但可能会乱序到达,则重新将失败记录重新发布到原始主题的尾部(重复若干次)可能很有用,而不是直接将其发送到死信主题。有关示例,请参见 this Stack Overflow Question。
以下错误处理程序配置将完全执行该操作:
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic.DLT", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
从 2.7 版开始,恢复器检查目标解析器选择的实际分区是否存在。如果分区不存在,则 ProducerRecord 中的分区被设置为 null,从而允许 KafkaProducer 选择分区。你可以通过将 verifyPartition 属性设置为 false 来禁用此检查。
从 3.1 版开始,将 logRecoveryRecord 属性设置为 true 将记录恢复记录和异常。
Managing Dead Letter Record Headers
参考上面 Publishing Dead-letter Records,DeadLetterPublishingRecoverer 具有两个属性,用于在这些头文件已经存在时(例如在重新处理失败的死信记录时,包括在使用 Non-Blocking Retries 时)管理头文件。
-
appendOriginalHeaders(defaulttrue) -
stripPreviousExceptionHeaders(自 2.8 版本起默认true)
Apache Kafka 支持同名多个头信息;要获取“最新”值,可以使用 headers.lastHeader(headerName);要获取多个头信息的迭代器,可以使用 headers.headers(headerName).iterator().
在反复重新发布失败记录时,这些头信息可能会增长(并且最终会导致发布失败,原因是 RecordTooLargeException);对于异常头信息,尤其是堆栈跟踪头信息,尤其如此。
这两个属性的原因是因为,虽然你可能只想保留最近的异常信息,但你可能希望保留记录在每次失败时经过的主题历史记录。
appendOriginalHeaders 应用于名为 ORIGINAL 的所有头信息,而 stripPreviousExceptionHeaders 应用于名为 EXCEPTION 的所有头信息。
从 2.8.4 版开始,你现在可以控制哪些标准头信息将添加到输出记录中。查看 enum HeadersToAdd 了解默认添加的(当前)10 个标准头信息的通用名称(这些不是实际的头信息名称,而只是一个抽象;实际的头信息名称由子类可以重写的 getHeaderNames() 方法设置)。
要排除头信息,请使用 excludeHeaders() 方法;例如,要禁止在头信息中添加异常堆栈跟踪,请使用:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,你可以通过添加 ExceptionHeadersCreator 完全自定义异常头信息的添加;这也将禁用所有标准异常头信息。
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
同样从版本 2.8.4 开始,您现在可以通过 addHeadersFunction 方法提供多个 header 函数。这允许应用附加函数,即使另一个函数已经注册,例如在使用 Non-Blocking Retries 时。
ExponentialBackOffWithMaxRetries Implementation
Spring Framework 提供了许多 BackOff 实现。默认情况下,ExponentialBackOff 将无限重试;在尝试了几次重试后放弃需要计算 maxElapsedTime。自 2.7.3 版开始,Spring for Apache Kafka 提供了 ExponentialBackOffWithMaxRetries,这是一个收到 maxRetries 属性并自动计算 maxElapsedTime 的子类,这更方便一些。
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
这将在调用恢复器之前重试 1、2、4、8、10、10 秒。