Spring Cloud Stream Apache Pulsar 绑定器
Spring for Apache Pulsar 提供了一个 Spring Cloud Stream 绑定器,我们可以用它来使用发布-订阅范式构建事件驱动的微服务。 在本节中,我们将详细介绍此绑定器的基本信息。
用法
为了在 Spring Cloud Stream 中使用 Apache Pulsar 绑定器,我们需要在应用程序中包含以下依赖项。
-
Maven
-
Gradle
<dependencies>
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
</dependency>
</dependencies>
dependencies {
implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}
概述
Spring Cloud Stream Apache Pulsar 绑定器允许应用程序专注于业务逻辑,而不是处理管理和维护 Pulsar 的底层细节。 绑定器为应用程序开发人员处理所有这些细节。 Spring Cloud Stream 带来了基于 Spring Cloud Function 的强大编程模型,允许应用程序开发人员使用函数式风格编写复杂的事件驱动应用程序。 应用程序可以从中立于中间件的方式开始,然后通过 Spring Boot 配置属性将 Pulsar 主题映射为 Spring Cloud Stream 中的目的地。 Spring Cloud Stream 构建在 Spring Boot 之上,当使用 Spring Cloud Stream 编写事件驱动的微服务时,您本质上是在编写一个 Boot 应用程序。 这是一个简单的 Spring Cloud Stream 应用程序。
@SpringBootApplication
public class SpringPulsarBinderSampleApp {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
public static void main(String[] args) {
SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
}
@Bean
public Supplier<Time> timeSupplier() {
return () -> new Time(String.valueOf(System.currentTimeMillis()));
}
@Bean
public Function<Time, EnhancedTime> timeProcessor() {
return (time) -> {
EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
return enhancedTime;
};
}
@Bean
public Consumer<EnhancedTime> timeLogger() {
return (time) -> this.logger.info("SINK: {}", time);
}
record Time(String time) {
}
record EnhancedTime(Time time, String extra) {
}
}
上述示例应用程序是一个成熟的 Spring Boot 应用程序,需要一些解释。但是,首先,您可以看到这只是普通的 Java 和一些 Spring 和 Spring Boot 注解。
我们这里有三个 Bean
方法——一个 java.util.function.Supplier
,一个 java.util.function.Function
,最后是一个 java.util.function.Consumer
。
Supplier
产生当前时间的毫秒数,Function
接收此时间并通过添加一些随机数据来增强它,然后 Consumer
记录增强后的时间。
为了简洁起见,我们省略了所有导入,但整个应用程序中没有任何 Spring Cloud Stream 特定的内容。 它如何成为一个与 Apache Pulsar 交互的 Spring Cloud Stream 应用程序? 您必须在应用程序中包含上述绑定器依赖项。 添加该依赖项后,您必须提供以下配置属性。
spring:
cloud:
function:
definition: timeSupplier;timeProcessor;timeLogger;
stream:
bindings:
timeProcessor-in-0:
destination: timeSupplier-out-0
timeProcessor-out-0:
destination: timeProcessor-out-0
timeLogger-in-0:
destination: timeProcessor-out-0
通过这个,上面的 Spring Boot 应用程序已经成为一个基于 Spring Cloud Stream 的端到端事件驱动应用程序。
因为我们的类路径上有 Pulsar 绑定器,所以应用程序与 Apache Pulsar 交互。
如果应用程序中只有一个函数,那么我们不需要告诉 Spring Cloud Stream 激活该函数执行,因为它默认会这样做。
如果应用程序中有多个这样的函数,如我们的示例所示,我们需要指示 Spring Cloud Stream 激活哪些函数。
在我们的例子中,我们需要激活所有这些函数,我们通过 spring.cloud.function.definition
属性来实现。
Bean 名称默认成为 Spring Cloud Stream 绑定名称的一部分。
绑定是 Spring Cloud Stream 中的一个基本抽象概念,框架通过它与中间件目的地通信。
Spring Cloud Stream 所做的一切几乎都发生在具体的绑定之上。
Supplier
只有输出绑定;Function
有输入和输出绑定,而 Consumer
只有输入绑定。
让我们以我们的 Supplier
bean——timeSupplier
为例。
此 Supplier
的默认绑定名称将是 timeSupplier-out-0
。
类似地,timeProcessor
函数的默认绑定名称在入站时将是 timeProcessor-in-0
,在出站时将是 timeProcessor-out-0
。
有关如何更改默认绑定名称的详细信息,请参阅 Spring Cloud Stream 参考文档。
在大多数情况下,使用默认绑定名称就足够了。
我们如上所示设置绑定名称上的目的地。
如果未提供目的地,则绑定名称成为目的地的值,如 timeSupplier-out-0
的情况。
运行上述应用程序时,您应该看到 Supplier
每秒执行一次,然后由 Function
消费并增强,最后由 Logger Consumer
消费。
基于绑定器的应用程序中的消息转换
在上述示例应用程序中,我们没有提供任何消息转换的模式信息。
这是因为,默认情况下,Spring Cloud Stream 使用其消息转换机制,该机制使用 Spring Framework 通过 Spring Messaging 项目建立的消息支持。
除非另有说明,否则 Spring Cloud Stream 在入站和出站绑定上都使用 application/json
作为消息转换的 content-type
。
在出站时,数据被序列化为 byte[]
,然后 Pulsar 绑定器使用 Schema.BYTES
将其通过网络发送到 Pulsar 主题。
类似地,在入站时,数据作为 byte[]
从 Pulsar 主题中消费,然后使用适当的消息转换器转换为目标类型。
在 Pulsar 中使用 Pulsar Schema 进行原生转换
尽管默认是使用框架提供的消息转换,但 Spring Cloud Stream 允许每个绑定器确定消息应该如何转换。 如果应用程序选择走这条路线,Spring Cloud Stream 会避免使用任何 Spring 提供的消息转换功能,并传递它接收或生成的数据。 Spring Cloud Stream 中的此功能被称为生产者端的原生编码和消费者端的原生解码。这意味着编码和解码原生发生在目标中间件上,在我们的例子中是 Apache Pulsar。 对于上述应用程序,我们可以使用以下配置来绕过框架转换并使用原生编码和解码。
spring:
cloud:
stream:
bindings:
timeSupplier-out-0:
producer:
use-native-encoding: true
timeProcessor-in-0:
destination: timeSupplier-out-0
consumer:
use-native-decoding: true
timeProcessor-out-0:
destination: timeProcessor-out-0
producer:
use-native-encoding: true
timeLogger-in-0:
destination: timeProcessor-out-0
consumer:
use-native-decoding: true
pulsar:
bindings:
timeSupplier-out-0:
producer:
schema-type: JSON
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
timeProcessor-in-0:
consumer:
schema-type: JSON
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
timeProcessor-out-0:
producer:
schema-type: AVRO
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
timeLogger-in-0:
consumer:
schema-type: AVRO
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
在生产者端启用原生编码的属性是核心 Spring Cloud Stream 的绑定级别属性。
您在生产者绑定上设置它——spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding
并将其设置为 true
。
类似地,对于消费者绑定,使用 spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding
并将其设置为 true
。
如果我们决定使用原生编码和解码,在 Pulsar 的情况下,我们需要设置相应的模式和底层消息类型信息。
此信息作为扩展绑定属性提供。
如您在配置中所示,属性是——spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type
用于模式信息,spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type
用于实际目标类型。
如果消息同时包含键和值,您可以使用 message-key-type
和 message-value-type
来指定它们的目标类型。
当省略 |
消息头转换
每条消息通常都包含需要在 Pulsar 和 Spring Messaging 之间通过 Spring Cloud Stream 输入和输出绑定进行传输的头信息。 为了支持这种传输,框架处理必要的消息头转换。
自定义头映射器
Pulsar 绑定器配置了一个默认的头映射器,可以通过提供您自己的 PulsarHeaderMapper
bean 来覆盖它。
在以下示例中,配置了一个 JSON 头映射器,它:
-
映射所有入站头(除了键为“
top
”或“secret
”的头) -
映射出站头(除了键为“
id
”、“timestamp
”或“userId
”的头) -
仅信任“
com.acme
”包中的对象进行出站反序列化 -
对任何“
com.acme.Money
”头值进行简单toString()
编码的序列化/反序列化
@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
return JsonPulsarHeaderMapper.builder()
.inboundPatterns("!top", "!secret", "*")
.outboundPatterns("!id", "!timestamp", "!userId", "*")
.trustedPackages("com.acme")
.toStringClasses("com.acme.Money")
.build();
}
在绑定器中使用 Pulsar 属性
绑定器使用 Spring for Apache Pulsar 框架的基本组件来构建其生产者和消费者绑定。
由于基于绑定器的应用程序是 Spring Boot 应用程序,因此绑定器默认使用 Spring for Apache Pulsar 的 Spring Boot 自动配置。
因此,核心框架级别可用的所有 Pulsar Spring Boot 属性也通过绑定器可用。
例如,您可以使用前缀为 spring.pulsar.producer…
、spring.pulsar.consumer…
等的属性。
此外,您还可以在绑定器级别设置这些 Pulsar 属性。
例如,这将同样有效——spring.cloud.stream.pulsar.binder.producer…
或 spring.cloud.stream.pulsar.binder.consumer…
。
上述两种方法都可以,但是当使用这些属性时,它会应用于整个应用程序。
如果应用程序中有多个函数,它们都将获得相同的属性。
您还可以将这些 Pulsar 属性设置在扩展绑定属性级别来解决这个问题。
扩展绑定属性应用于绑定本身。
例如,如果您有一个输入和输出绑定,并且两者都需要单独的 Pulsar 属性集,则必须在扩展绑定上设置它们。
生产者绑定的模式是 spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…
。
类似地,对于消费者绑定,模式是 spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…
。
通过这种方式,您可以在同一个应用程序中为不同的绑定应用一套单独的 Pulsar 属性。
扩展绑定属性的优先级最高。
在绑定器中应用属性的优先级顺序是 扩展绑定属性 → 绑定器属性 → Spring Boot 属性
(从高到低)。
Pulsar 绑定器属性的资源
以下是一些可以依赖的资源,以了解有关 Pulsar 绑定器提供的属性的更多信息。
Pulsar 生产者绑定配置。
这些属性需要 spring.cloud.stream.bindings.<binding-name>.producer
前缀。
所有 Spring Boot 提供的 Pulsar 生产者属性 也可通过此配置类获得。
Pulsar 消费者绑定配置。
这些属性需要 spring.cloud.stream.bindings.<binding-name>.consumer
前缀。
所有 Spring Boot 提供的 Pulsar 消费者属性 也可通过此配置类获得。
有关常见的 Pulsar 绑定器特定配置属性,请参阅 此链接。这些属性需要 spring.cloud.stream.pulsar.binder
前缀。
上述指定的生产者和消费者属性(包括 Spring Boot 属性)可以在绑定器中使用 spring.cloud.stream.pulsar.binder.producer
或 spring.cloud.stream.pulsar.binder.consumer
前缀。
Pulsar 主题供应器
Spring Cloud Stream Apache Pulsar 绑定器附带一个开箱即用的 Pulsar 主题供应器。
当运行应用程序时,如果缺少必要的主题,Pulsar 将为您创建主题。
然而,这是一个基本的非分区主题,如果您想要像创建分区主题这样的高级功能,您可以依赖绑定器中的主题供应器。
Pulsar 主题供应器使用框架中的 PulsarAdministration
,它使用 PulsarAdminBuilder
。
因此,您需要设置 spring.pulsar.administration.service-url
属性,除非您在默认服务器和端口上运行 Pulsar。
创建主题时指定分区计数
创建主题时,您可以通过两种方式设置分区计数。
首先,您可以在绑定器级别使用属性 spring.cloud.stream.pulsar.binder.partition-count
进行设置。
正如我们上面看到的,这样做将使应用程序创建的所有主题都继承此属性。
假设您想要在绑定级别对设置分区进行精细控制。
在这种情况下,您可以使用格式 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count
为每个绑定设置 partition-count
属性。
这样,同一应用程序中不同函数创建的各种主题将根据应用程序要求具有不同的分区。