事务绑定器

通过将 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix 设置为非空值(例如 tx-)来启用事务。 在处理器应用程序中使用时,消费者会启动事务;在消费者线程上发送的任何记录都参与同一事务。 当监听器正常退出时,监听器容器会将偏移量发送到事务并提交。 所有使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性配置的生产者绑定都使用一个通用的生产者工厂;单个绑定 Kafka 生产者属性将被忽略。

正常的绑定器重试(和死信)在事务中不受支持,因为重试将在原始事务中运行,该事务可能会回滚,并且任何已发布的记录也将回滚。 当启用重试时(通用属性 maxAttempts 大于零),重试属性用于配置 DefaultAfterRollbackProcessor 以在容器级别启用重试。 同样,死信记录的发布不再在事务中进行,此功能已移至监听器容器,同样通过在主事务回滚后运行的 DefaultAfterRollbackProcessor 来实现。

如果希望在源应用程序中使用事务,或者从某个任意线程进行仅生产者事务(例如 @Scheduled 方法),则必须获取事务生产者工厂的引用,并使用它定义一个 KafkaTransactionManager bean。

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

请注意,我们使用 BinderFactory 获取绑定器的引用;当只配置了一个绑定器时,在第一个参数中使用 null。 如果配置了多个绑定器,请使用绑定器名称来获取引用。 一旦我们获得了绑定器的引用,我们就可以获取 ProducerFactory 的引用并创建一个事务管理器。 然后你将使用正常的 Spring 事务支持,例如 TransactionTemplate@Transactional,例如:

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

如果你希望将仅生产者事务与来自其他事务管理器的事务同步,请使用 ChainedTransactionManager

如果部署应用程序的多个实例,每个实例都需要唯一的 transactionIdPrefix

Kafka 事务中的异常重试行为

配置事务回滚重试行为

在 Kafka 事务中处理消息时,可以使用 defaultRetryable 属性和 retryableExceptions 映射来配置在事务回滚后应重试哪些异常。

默认重试行为

DefaultAfterRollbackProcessor 决定哪些异常在事务回滚后触发重试。 默认情况下,所有异常都将重试,但你可以修改此行为:

spring:
 cloud:
   stream:
     kafka:
       bindings:
         <binding-name>:
           consumer:
             defaultRetryable: false  # 将默认值更改为不重试异常

defaultRetryable 设置为 false 时,DefaultAfterRollbackProcessor 将配置为 defaultFalse(true),这意味着除非明确配置为可重试,否则异常将不会重试。

异常特定配置

为了进行精细控制,你可以为单个异常类型指定重试行为:

spring:
 cloud:
   stream:
     kafka:
       bindings:
         <binding-name>:
           consumer:
             retryableExceptions:
               java.lang.IllegalStateException: true    # 始终重试此异常
               java.lang.IllegalArgumentException: false  # 永不重试此异常

DefaultAfterRollbackProcessor 将对标记为 true 的异常使用 addRetryableExceptions(),对标记为 false 的异常使用 addNotRetryableExceptions()。 这些异常特定的配置优先于默认行为。

实现细节

  • 在使用事务时,retryableExceptions 中只能配置异常类型(Exception 的子类)

  • 如果指定了非 Exception 类型,将抛出 IllegalArgumentException

  • DefaultAfterRollbackProcessor 仅在启用事务且禁用批处理模式时才配置

  • 此配置确保事务重试行为与非事务性重试处理一致