Partition support on the outbound
Kafka 流处理程序通常将已处理的输出发送到一个出站 Kafka 主题。如果出站主题已分区,并且处理程序需要将传出数据发送到特定分区,则应用程序需要提供一个 StreamPartitioner
类型的 bean。参见 StreamPartitioner 了解更多详情。不妨看看一些示例。
A Kafka Streams processor usually sends the processed output into an outbound Kafka topic.
If the outbound topic is partitioned and the processor needs to send the outgoing data into particular partitions, the applications needs to provide a bean of type StreamPartitioner
.
See StreamPartitioner for more details.
Let’s see some examples.
这也是我们多次看到的相同的处理器:
This is the same processor we already saw multiple times,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
这是输出绑定目标:
Here is the output binding destination:
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
如果主题 outputTopic
有 4 个分区,如果你不提供分区策略,则 Kafka Streams 将使用可能根据具体用例不是你想要的结果的默认分区策略。假设你想将任何匹配到 spring
的键发送到第 0 个分区,将 cloud
发送到第 1 个分区,将 stream
发送到第 2 个分区,并将其他所有内容发送到第 3 个分区。这是你需要在应用程序中执行的操作。
If the topic outputTopic
has 4 partitions, if you don’t provide a partitioning strategy, Kafka Streams will use default partitioning strategy which may not be the outcome you want depending on the particular use case.
Let’s say, you want to send any key that matches to spring
to partition 0, cloud
to partition 1, stream
to partition 2, and everything else to partition 3.
This is what you need to do in the application.
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
这是一个基本的实现,但是,你可以访问记录的键/值、主题名称和分区总数。因此,你可以根据需要实现复杂的分区策略。
This is a rudimentary implementation, however, you have access to the key/value of the record, the topic name and the total number of partitions. Therefore, you can implement complex partitioning strategies if need be.
你还需要在此应用程序配置中提供此 bean 名称。
You also need to provide this bean name along with the application configuration.
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
应用程序中的每个输出主题都需要像这样分别进行配置。
Each output topic in the application needs to be configured separately like this.