提示、技巧和秘诀

使用 Kafka 实现简单的 DLQ

问题陈述

作为一名开发人员,我希望编写一个消费应用程序,用于处理来自 Kafka 主题的记录。 但是,如果在处理过程中发生某些错误,我不希望应用程序完全停止。 相反,我希望将出错的记录发送到 DLT(死信主题),然后继续处理新记录。

解决方案

解决此问题的方法是使用 Spring Cloud Stream 中的 DLQ 功能。 为了本次讨论的目的,我们假设以下是我们的处理器函数。

@Bean
public Consumer<byte[]> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

这是一个非常简单的函数,它会对其处理的所有记录抛出异常,但您可以采用此函数并将其扩展到任何其他类似情况。

为了将出错的记录发送到 DLT,我们需要提供以下配置。

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq

为了激活 DLQ,应用程序必须提供一个组名。 匿名消费者无法使用 DLQ 功能。 我们还需要通过将 Kafka 消费者绑定的 enableDLQ 属性设置为 true 来启用 DLQ。 最后,我们可以选择通过在 Kafka 消费者绑定上提供 dlqName 来提供 DLT 名称,否则在这种情况下它默认为 error.input-topic.my-group

请注意,在上面提供的示例消费者中,有效负载的类型是 byte[]。 默认情况下,Kafka 绑定器中的 DLQ 生产者期望有效负载类型为 byte[]。 如果不是这种情况,那么我们需要提供适当的序列化器配置。 例如,让我们将消费者函数重写如下:

@Bean
public Consumer<String> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

现在,我们需要告诉 Spring Cloud Stream,当写入 DLT 时,我们希望如何序列化数据。 以下是此场景的修改配置:

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq
         dlqProducerProperties:
           configuration:
             value.serializer: org.apache.kafka.common.serialization.StringSerializer

带有高级重试选项的 DLQ

问题陈述

这与上面的秘诀类似,但作为开发人员,我希望配置重试的处理方式。

解决方案

如果您遵循了上述秘诀,那么当处理遇到错误时,您将获得 Kafka 绑定器中内置的默认重试选项。

默认情况下,绑定器最多重试 3 次,初始延迟为一秒,每次退避乘数为 2.0,最大延迟为 10 秒。 您可以按如下方式更改所有这些配置:

spring.cloud.stream.bindings.processData-in-0.consumer.maxAttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultiplier
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval

如果您愿意,您还可以通过提供布尔值映射来提供可重试异常列表。 例如,

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false

默认情况下,上面列表中未列出的任何异常都将重试。 如果不希望如此,则可以通过提供以下内容来禁用:

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false

您还可以提供自己的 RetryTemplate 并将其标记为 @StreamRetryTemplate,它将被绑定器扫描并使用。 当您需要更复杂的重试策略和策略时,这很有用。

如果您有多个 @StreamRetryTemplate bean,那么您可以使用以下属性指定您的绑定想要哪个:

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>

使用 DLQ 处理反序列化错误

问题陈述

我的处理器在 Kafka 消费者中遇到反序列化异常。 我希望 Spring Cloud Stream DLQ 机制能捕获这种情况,但它没有。 我该如何处理?

解决方案

Spring Cloud Stream 提供的正常 DLQ 机制在 Kafka 消费者抛出不可恢复的反序列化异常时将无济于事。 这是因为,此异常甚至在消费者的 poll() 方法返回之前就发生了。 Spring for Apache Kafka 项目提供了一些很好的方法来帮助绑定器处理这种情况。 让我们来探讨一下。

假设这是我们的函数:

@Bean
public Consumer<String> functionName() {
    return s -> {
        System.out.println(s);
    };
}

这是一个简单的函数,它接受一个 String 参数。

我们希望绕过 Spring Cloud Stream 提供的消息转换器,而使用原生反序列化器。 对于 String 类型,这没有多大意义,但对于 AVRO 等更复杂的类型,您必须依赖外部反序列化器,因此希望将转换委托给 Kafka。

现在当消费者收到数据时,假设有一个导致反序列化错误的坏记录,例如,可能有人传递了一个 Integer 而不是 String。 在这种情况下,如果您不在应用程序中做任何事情,异常将通过链传播,您的应用程序最终将退出。

为了处理这种情况,您可以添加一个 ListenerContainerCustomizer @Bean,它配置一个 DefaultErrorHandler。 此 DefaultErrorHandler 配置了一个 DeadLetterPublishingRecoverer。 我们还需要为消费者配置一个 ErrorHandlingDeserializer。 这听起来很复杂,但实际上,在这种情况下它归结为这 3 个 bean。

	@Bean
	public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
		return (container, dest, group) -> {
			container.setCommonErrorHandler(errorHandler);
		};
	}
	@Bean
	public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
		return new DefaultErrorHandler(deadLetterPublishingRecoverer);
	}
	@Bean
	public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
		return new DeadLetterPublishingRecoverer(bytesTemplate);
	}

让我们分析一下它们。 第一个是 ListenerContainerCustomizer bean,它接受一个 DefaultErrorHandler。 容器现在使用该特定的错误处理器进行自定义。 您可以在此处 了解更多关于容器自定义的信息

第二个 bean 是 DefaultErrorHandler,它配置为发布到 DLT。 有关 DefaultErrorHandler 的更多详细信息,请参阅此处 此处

第三个 bean 是 DeadLetterPublishingRecoverer,它最终负责发送到 DLT。 默认情况下,DLT 主题的名称为 ORIGINAL_TOPIC_NAME.DLT。 但是您可以更改它。 有关更多详细信息,请参阅 文档

我们还需要通过应用程序配置配置一个 ErrorHandlingDeserializer

ErrorHandlingDeserializer 委托给实际的反序列化器。 在发生错误时,它将记录的键/值设置为 null,并包含消息的原始字节。 然后它在头部设置异常,并将此记录传递给监听器,监听器再调用注册的错误处理器。

以下是所需的配置:

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
      consumer:
       use-native-decoding: true
  kafka:
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: dlq-topic
          dlqProducerProperties:
            configuration:
              value.serializer: org.apache.kafka.common.serialization.StringSerializer
          configuration:
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

我们通过绑定上的 configuration 属性提供 ErrorHandlingDeserializer。 我们还指示要委托的实际反序列化器是 StringDeserializer

请记住,上面列出的所有 dlq 属性都与此秘诀中的讨论无关。 它们纯粹是为了解决任何应用程序级别的错误。

Kafka 绑定器中的基本偏移量管理

问题陈述

我想编写一个 Spring Cloud Stream Kafka 消费者应用程序,但不确定它如何管理 Kafka 消费者偏移量。 您能解释一下吗?

解决方案

我们鼓励您阅读 文档 部分,以彻底了解它。

要点如下:

Kafka 默认支持两种类型的起始偏移量 - earliestlatest。 它们的语义从名称上看是自解释的。

假设您是第一次运行消费者。 如果您的 Spring Cloud Stream 应用程序中缺少 group.id,那么它就成为一个匿名消费者。 每当您有一个匿名消费者时,Spring Cloud Stream 应用程序默认将从主题分区中 latest 可用的偏移量开始。 另一方面,如果您明确指定 group.id,那么 Spring Cloud Stream 应用程序默认将从主题分区中 earliest 可用的偏移量开始。

在上述两种情况(具有显式组和匿名组的消费者)中,起始偏移量可以通过使用属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset 并将其设置为 earliestlatest 来切换。

现在,假设您之前已经运行过消费者,现在再次启动它。 在这种情况下,上述情况下的起始偏移量语义不适用,因为消费者会找到一个已提交的消费者组偏移量(对于匿名消费者,尽管应用程序不提供 group.id,绑定器也会为您自动生成一个)。 它只是从上次提交的偏移量开始。 即使您提供了 startOffset 值,这也是如此。

但是,您可以通过使用 resetOffsets 属性来覆盖消费者从上次提交的偏移量开始的默认行为。 为此,请将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets 设置为 true(默认值为 false)。 然后确保您提供 startOffset 值(earliestlatest)。 当您这样做并启动消费者应用程序时,每次启动时,它都会像第一次启动一样开始,并忽略分区的所有已提交偏移量。

在 Kafka 中寻找任意偏移量

问题陈述

使用 Kafka 绑定器,我知道它可以将偏移量设置为 earliestlatest,但我需要将偏移量寻找为中间的某个任意偏移量。 有没有办法使用 Spring Cloud Stream Kafka 绑定器实现这一点?

解决方案

前面我们看到了 Kafka 绑定器如何处理基本的偏移量管理。 默认情况下,绑定器不允许您回溯到任意偏移量,至少通过我们在该秘诀中看到的机制是不允许的。 但是,绑定器提供了一些低级策略来实现此用例。 让我们来探讨一下。

首先,当您想要重置到 earliestlatest 之外的任意偏移量时,请务必将 resetOffsets 配置保留为其默认值 false。 然后,您必须提供一个类型为 KafkaBindingRebalanceListener 的自定义 bean,它将被注入到所有消费者绑定中。 它是一个带有几个默认方法的接口,但我们感兴趣的方法是:

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

让我们看看详细信息。

本质上,此方法将在主题分区的初始分配期间或再平衡之后每次调用。 为了更好地说明,我们假设我们的主题是 foo,它有 4 个分区。 最初,我们只启动组中的一个消费者,此消费者将从所有分区消费。 当消费者第一次启动时,所有 4 个分区都会被初始分配。 但是,我们不希望分区从默认值开始消费(earliest,因为我们定义了一个组),而是希望每个分区在寻找任意偏移量后开始消费。 想象一下您有一个业务案例,需要从如下所示的某些偏移量开始消费。

Partition   start offset

0           1000
1           2000
2           2000
3           1000

这可以通过如下实现上述方法来实现。

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle exceptions carefully.
                }
            }
        });
    }
}

这只是一个粗略的实现。 实际用例比这复杂得多,您需要相应地进行调整,但这肯定为您提供了一个基本草图。 当消费者 seek 失败时,它可能会抛出一些运行时异常,您需要决定在这些情况下如何处理。

[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === 如果我们启动第二个具有相同组 ID 的消费者会怎样?

当我们添加第二个消费者时,会发生再平衡,一些分区将被移动。 假设新消费者获得了分区 23。 当这个新的 Spring Cloud Stream 消费者调用 onPartitionsAssigned 方法时,它将看到这是该消费者上分区 23 的初始分配。 因此,它将由于 initial 参数上的条件检查而执行寻址操作。 对于第一个消费者,它现在只有分区 01。 但是,对于此消费者,它只是一个再平衡事件,不被视为初始分配。 因此,它不会由于 initial 参数上的条件检查而重新寻址到给定的偏移量。

[[how-do-i-manually-acknowledge-using-kafka-binder?]] == 如何使用 Kafka 绑定器手动确认?

问题陈述

使用 Kafka 绑定器,我想在我的消费者中手动确认消息。 我该怎么做?

解决方案

默认情况下,Kafka 绑定器委托给 Spring for Apache Kafka 项目中的默认提交设置。 Spring Kafka 中的默认 ackModebatch。 有关更多详细信息,请参阅此处 此处

在某些情况下,您希望禁用此默认提交行为并依赖手动提交。 以下步骤允许您这样做。

将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode 设置为 MANUALMANUAL_IMMEDIATE。 当这样设置时,消息头中将存在一个名为 kafka_acknowledgment(来自 KafkaHeaders.ACKNOWLEDGMENT)的头,该头将由消费者方法接收。

例如,想象这是您的消费者方法。

@Bean
public Consumer<Message<String>> myConsumer() {
    return msg -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}

然后将属性 spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode 设置为 MANUALMANUAL_IMMEDIATE

[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == 如何在 Spring Cloud Stream 中覆盖默认绑定名称?

问题陈述

Spring Cloud Stream 根据函数定义和签名创建默认绑定,但我如何将它们覆盖为更符合领域友好的名称?

解决方案

假设以下是您的函数签名。

@Bean
public Function<String, String> uppercase(){
...
}

默认情况下,Spring Cloud Stream 将创建如下绑定。

  1. uppercase-in-0

  2. uppercase-out-0

您可以使用以下属性覆盖这些绑定。

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out

此后,所有绑定属性都必须在新名称 my-transformer-inmy-transformer-out 上进行设置。

这是另一个使用 Kafka Streams 和多个输入的示例。

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}

默认情况下,Spring Cloud Stream 将为该函数创建三个不同的绑定名称。

  1. processOrder-in-0

  2. processOrder-in-1

  3. processOrder-out-0

每次您想在这些绑定上设置某些配置时,都必须使用这些绑定名称。 您不喜欢这样,并且希望使用更符合领域友好且可读的绑定名称,例如。

  1. orders

  2. accounts

  3. enrichedOrders

您可以通过简单地设置这三个属性来轻松实现这一点

  1. spring.cloud.stream.function.bindings.processOrder-in-0=orders

  2. spring.cloud.stream.function.bindings.processOrder-in-1=accounts

  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders

完成此操作后,它将覆盖默认绑定名称,您希望在它们上设置的任何属性都必须使用这些新的绑定名称。

[[how-do-i-send-a-message-key-as-part-of-my-record?]] == 如何将消息键作为记录的一部分发送?

问题陈述

我需要将键与记录的有效负载一起发送,Spring Cloud Stream 中有什么方法可以做到这一点吗?

解决方案

通常,您需要将关联数据结构(如映射)作为带有键和值的记录发送。 Spring Cloud Stream 允许您以直接的方式实现这一点。 以下是实现此目的的基本蓝图,但您可能需要根据您的特定用例进行调整。

这是一个示例生产者方法(又名 Supplier)。

@Bean
public Supplier<Message<String>> supplier() {
    return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}

这是一个简单的函数,它发送一个带有 String 有效负载但也有一个键的消息。 请注意,我们使用 KafkaHeaders.MESSAGE_KEY 将键设置为消息头。

如果您想更改键(从默认的 kafka_messageKey),那么在配置中,我们需要指定此属性:

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']

请注意,我们使用绑定名称 supplier-out-0,因为这是我们的函数名称,请相应更新。

然后,当生成消息时,我们使用这个新键。

[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == 如何使用原生序列化器和反序列化器,而不是 Spring Cloud Stream 完成的消息转换?

问题陈述

我不想使用 Spring Cloud Stream 中的消息转换器,而是想使用 Kafka 中的原生序列化器和反序列化器。 默认情况下,Spring Cloud Stream 使用其内置的消息转换器来处理此转换。 我如何绕过它并将责任委托给 Kafka?

解决方案

这真的很容易做到。

您所要做的就是提供以下属性以启用原生序列化。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true

然后,您还需要设置序列化器。 有几种方法可以做到这一点。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

或者使用绑定器配置。

spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

当使用绑定器方式时,它适用于所有绑定,而将它们设置在绑定级别是针对每个绑定的。

在反序列化方面,您只需将反序列化器作为配置提供即可。

例如,

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

您也可以在绑定器级别设置它们。

有一个可选属性可以强制原生解码。

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true

但是,在 Kafka 绑定器的情况下,这是不必要的,因为当它到达绑定器时,Kafka 已经使用配置的反序列化器对其进行了反序列化。

解释 Kafka Streams 绑定器中偏移量重置的工作原理

问题陈述

默认情况下,Kafka Streams 绑定器始终从新消费者的最早偏移量开始。 有时,从最新偏移量开始对应用程序有利或必需。 Kafka Streams 绑定器允许您这样做。

解决方案

在我们看解决方案之前,让我们看以下场景。

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
    (s, t) -> s.join(t, ...)
    ...
}

我们有一个 BiConsumer bean,它需要两个输入绑定。 在这种情况下,第一个绑定用于 KStream,第二个绑定用于 KTable。 当第一次运行此应用程序时,默认情况下,两个绑定都从 earliest 偏移量开始。 如果由于某些要求我想从 latest 偏移量开始怎么办? 您可以通过启用以下属性来做到这一点。

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest

如果您只想让一个绑定从 latest 偏移量开始,而另一个绑定从默认的 earliest 偏移量开始,那么请将后者绑定排除在配置之外。

请记住,一旦存在已提交的偏移量,这些设置将 生效,并且已提交的偏移量将优先。

跟踪 Kafka 中记录成功发送(生产)的情况

问题陈述

我有一个 Kafka 生产者应用程序,我希望跟踪所有成功的发送。

解决方案

让我们假设应用程序中有以下 supplier

@Bean
	public Supplier<Message<String>> supplier() {
		return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
	}

然后,我们需要定义一个新的 MessageChannel bean 来捕获所有成功的发送信息。

@Bean
	public MessageChannel fooRecordChannel() {
		return new DirectChannel();
	}

接下来,在应用程序配置中定义此属性以提供 recordMetadataChannel 的 bean 名称。

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel

此时,成功的发送信息将发送到 fooRecordChannel

您可以编写如下 IntegrationFlow 来查看信息。

@Bean
public IntegrationFlow integrationFlow() {
    return f -> f.channel("fooRecordChannel")
                 .handle((payload, messageHeaders) -> payload);
}

handle 方法中,有效负载是发送到 Kafka 的内容,消息头包含一个名为 kafka_recordMetadata 的特殊键。 它的值是一个 RecordMetadata,包含有关主题分区、当前偏移量等信息。

在 Kafka 中添加自定义头部映射器

问题陈述

我有一个 Kafka 生产者应用程序,它设置了一些头部,但在消费者应用程序中却丢失了。这是为什么?

解决方案

在正常情况下,这应该没问题。

想象一下,您有以下生产者。

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

在消费者端,您仍然应该看到头部 "foo",以下操作不应给您带来任何问题。

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

如果您在应用程序中提供一个 自定义头部映射器,那么这就不起作用了。 假设您的应用程序中有一个空的 KafkaHeaderMapper

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

如果这是您的实现,那么您将在消费者端丢失 foo 头部。 您可能在这些 KafkaHeaderMapper 方法中有一些逻辑。 您需要以下内容来填充 foo 头部。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

这将正确地将 foo 头部从生产者填充到消费者。

关于 id 头的特别说明

在 Spring Cloud Stream 中,id 头是一个特殊的头,但有些应用程序可能希望拥有特殊的自定义 id 头——例如 custom-idIDId。 第一个 (custom-id) 将在没有自定义头映射器的情况下从生产者传播到消费者。 但是,如果您使用框架保留的 id 头的变体(例如 IDIdiD 等)进行生产,那么您将遇到框架内部的问题。 有关此用例的更多上下文,请参阅此 StackOverflow 线程。 在这种情况下,您必须使用自定义 KafkaHeaderMapper 来映射区分大小写的 id 头。 例如,假设您有以下生产者。

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

上面的 Id 头将从消费端消失,因为它与框架 id 头冲突。 您可以提供一个自定义 KafkaHeaderMapper 来解决此问题。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

通过这样做,idId 头都将从生产者到消费者端可用。

在事务中向多个主题生产消息

问题陈述

我如何以事务方式向多个 Kafka 主题生产消息?

有关更多上下文,请参阅此 StackOverflow 问题

解决方案

使用 Kafka 绑定器中的事务支持进行事务处理,然后提供一个 AfterRollbackProcessor。 为了向多个主题生产消息,请使用 StreamBridge API。

以下是实现此目的的代码片段:

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
    return str -> {
        System.out.println(str);
        this.bridge.send("left", str.toUpperCase());
        this.bridge.send("right", str.toLowerCase());
        if (str.equals("Fail")) {
            throw new RuntimeException("test");
        }
    };
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
    return (container, dest, group) -> {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                MessageChannel.class)).getTransactionalProducerFactory();
        KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
        DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
        container.setAfterRollbackProcessor(rollbackProcessor);
    };
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
    return new DefaultAfterRollbackProcessor<>(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}

所需配置

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1

为了测试,您可以使用以下内容:

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
        System.in.read();
        template.send("input", "Fail".getBytes());
        template.send("input", "Good".getBytes());
    };
}

一些重要说明:

请确保您的应用程序配置中没有任何 DLQ 设置,因为我们手动配置 DLT(默认情况下,它将发布到基于初始消费者函数的名为 input.DLT 的主题)。 此外,将消费者绑定上的 maxAttempts 重置为 1,以避免绑定器重试。 在上面的示例中,它将总共重试 3 次(初始尝试 + FixedBackoff 中的 2 次尝试)。

有关如何测试此代码的更多详细信息,请参阅 StackOverflow 线程。 如果您正在使用 Spring Cloud Stream 通过添加更多消费者函数来测试它,请务必将消费者绑定上的 isolation-level 设置为 read-committed

StackOverflow 线程 也与此讨论相关。

运行多个可轮询消费者时要避免的陷阱

问题陈述

我如何运行可轮询消费者的多个实例并为每个实例生成唯一的 client.id

解决方案

假设我有以下定义:

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group

当运行应用程序时,Kafka 消费者会生成一个 client.id(类似于 consumer-my-group-1)。 对于正在运行的应用程序的每个实例,此 client.id 将相同,导致意外问题。

为了解决这个问题,您可以在应用程序的每个实例上添加以下属性:

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}

有关更多详细信息,请参阅此 GitHub 问题