Partition support on the outbound

Kafka 流处理程序通常将已处理的输出发送到一个出站 Kafka 主题。如果出站主题已分区,并且处理程序需要将传出数据发送到特定分区,则应用程序需要提供一个 StreamPartitioner 类型的 bean。参见 StreamPartitioner 了解更多详情。不妨看看一些示例。

A Kafka Streams processor usually sends the processed output into an outbound Kafka topic. If the outbound topic is partitioned and the processor needs to send the outgoing data into particular partitions, the applications needs to provide a bean of type StreamPartitioner. See StreamPartitioner for more details. Let’s see some examples.

这也是我们多次看到的相同的处理器:

This is the same processor we already saw multiple times,

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    ...
}

这是输出绑定目标:

Here is the output binding destination:

spring.cloud.stream.bindings.process-out-0.destination: outputTopic

如果主题 outputTopic 有 4 个分区,如果你不提供分区策略,则 Kafka Streams 将使用可能根据具体用例不是你想要的结果的默认分区策略。假设你想将任何匹配到 spring 的键发送到第 0 个分区,将 cloud 发送到第 1 个分区,将 stream 发送到第 2 个分区,并将其他所有内容发送到第 3 个分区。这是你需要在应用程序中执行的操作。

If the topic outputTopic has 4 partitions, if you don’t provide a partitioning strategy, Kafka Streams will use default partitioning strategy which may not be the outcome you want depending on the particular use case. Let’s say, you want to send any key that matches to spring to partition 0, cloud to partition 1, stream to partition 2, and everything else to partition 3. This is what you need to do in the application.

@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
    return (t, k, v, n) -> {
        if (k.equals("spring")) {
            return 0;
        }
        else if (k.equals("cloud")) {
            return 1;
        }
        else if (k.equals("stream")) {
            return 2;
        }
        else {
            return 3;
        }
    };
}

这是一个基本的实现,但是,你可以访问记录的键/值、主题名称和分区总数。因此,你可以根据需要实现复杂的分区策略。

This is a rudimentary implementation, however, you have access to the key/value of the record, the topic name and the total number of partitions. Therefore, you can implement complex partitioning strategies if need be.

你还需要在此应用程序配置中提供此 bean 名称。

You also need to provide this bean name along with the application configuration.

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner

应用程序中的每个输出主题都需要像这样分别进行配置。

Each output topic in the application needs to be configured separately like this.