编程模型的辅助功能

单个应用程序中的多个 Kafka Streams 处理器

Binder 允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。 您可以拥有一个如下所示的应用程序。

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

在这种情况下,binder 将创建 3 个独立的 Kafka Streams 对象,它们具有不同的应用程序 ID(更多内容见下文)。 但是,如果应用程序中有一个以上的处理器,您必须告知 Spring Cloud Stream 哪些函数需要被激活。 以下是激活函数的方法。

spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess

如果您希望某些函数不立即激活,可以将其从列表中移除。

当您在同一个应用程序中有一个 Kafka Streams 处理器和通过不同 binder 处理的其他类型的 Function bean(例如,基于常规 Kafka 消息通道 binder 的函数 bean)时,这也是正确的。

Kafka Streams 应用程序 ID

应用程序 ID 是 Kafka Streams 应用程序必须提供的属性。 Spring Cloud Stream Kafka Streams binder 允许您以多种方式配置此应用程序 ID。

如果应用程序中只有一个处理器,则可以使用以下属性在 binder 级别设置此 ID:

spring.cloud.stream.kafka.streams.binder.applicationId

为方便起见,如果只有一个处理器,您还可以使用 spring.application.name 作为属性来委托应用程序 ID。

如果应用程序中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用程序 ID。 在函数模型中,您可以将其作为属性附加到每个函数。

例如,假设您有以下函数。

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

然后,您可以使用以下 binder 级别属性为每个函数设置应用程序 ID。

spring.cloud.stream.kafka.streams.binder.functions.process.applicationId

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId

对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也适用。 但是,如果您使用函数模型,如上所示在 binder 级别为每个函数设置应用程序 ID 会容易得多。

对于生产部署,强烈建议通过配置显式指定应用程序 ID。 如果您正在自动扩展应用程序,这一点尤其关键,在这种情况下,您需要确保使用相同的应用程序 ID 部署每个实例。

如果应用程序未提供应用程序 ID,则 binder 将为您自动生成一个静态应用程序 ID。 这在开发场景中很方便,因为它避免了显式提供应用程序 ID 的需要。 以这种方式生成的应用程序 ID 在应用程序重启后将保持静态。 在函数模型中,生成的应用程序 ID 将是函数 bean 名称后跟字面量 applicationID,例如,如果 process 是函数 bean 名称,则为 process-applicationID

设置应用程序 ID 摘要

  • 默认情况下,binder 将为每个函数方法自动生成应用程序 ID。

  • 如果只有一个处理器,则可以使用 spring.kafka.streams.applicationIdspring.application.namespring.cloud.stream.kafka.streams.binder.applicationId

  • 如果有多个处理器,则可以使用属性 spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId 为每个函数设置应用程序 ID。

使用函数式风格覆盖 binder 生成的默认绑定名称

默认情况下,当使用函数式风格时,binder 使用上述策略生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。 如果您想覆盖这些绑定名称,可以通过指定以下属性来实现。

spring.cloud.stream.function.bindings.<default binding name>。默认绑定名称是 binder 生成的原始绑定名称。

例如,假设您有这个函数。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

Binder 将生成名为 process-in-0process-in-1process-out-0 的绑定。 现在,如果您想将它们更改为完全不同的名称,例如更具领域特定性的绑定名称,则可以按如下方式操作。

spring.cloud.stream.function.bindings.process-in-0=users

spring.cloud.stream.function.bindings.process-in-0=regions

spring.cloud.stream.function.bindings.process-out-0=clicks

之后,您必须在这些新绑定名称上设置所有绑定级别属性。

请记住,对于上述函数式编程模型,在大多数情况下,遵循默认绑定名称是有意义的。 您可能仍希望进行此覆盖的唯一原因是当您有大量配置属性并且希望将绑定映射到更具领域友好性的名称时。

设置引导服务器配置

运行 Kafka Streams 应用程序时,您必须提供 Kafka 代理服务器信息。 如果您不提供此信息,binder 期望您在默认的 localhost:9092 运行代理。 如果不是这种情况,则需要覆盖它。有两种方法可以做到这一点。

  • 使用 boot 属性 - spring.kafka.bootstrapServers

  • Binder 级别属性 - spring.cloud.stream.kafka.streams.binder.brokers

就 binder 级别属性而言,无论您是否使用通过常规 Kafka binder 提供的代理属性 - spring.cloud.stream.kafka.binder.brokers 都没有关系。 Kafka Streams binder 将首先检查是否设置了 Kafka Streams binder 特定代理属性 (spring.cloud.stream.kafka.streams.binder.brokers),如果未找到,它会查找 spring.cloud.stream.kafka.binder.brokers