Spring Cloud Stream Schema Registry
简介
当组织拥有基于消息的发布/订阅架构,并且多个生产者和消费者微服务相互通信时,通常需要所有这些微服务就一个基于模式的契约达成一致。 当这样的模式需要演进以适应新的业务需求时,现有组件仍然需要继续工作。 Spring Cloud Stream 支持一个独立的模式注册中心服务器,通过该服务器可以注册上述模式并供应用程序使用。 Spring Cloud Stream 模式注册中心支持还提供了基于 Avro 的模式注册中心客户端支持,这些客户端本质上提供了消息转换器,它们在消息转换期间与模式注册中心通信以协调模式。 Spring Cloud Stream 提供的模式演进支持既适用于上述独立的模式注册中心,也适用于 Confluent 提供的专门与 Apache Kafka 配合使用的模式注册中心。
Spring Cloud Stream 模式注册中心概述
Spring Cloud Stream 模式注册中心支持模式演进,以便数据可以随着时间的推移而演进,并且仍然可以与旧的或新的生产者和消费者协同工作,反之亦然。 大多数序列化模型,特别是那些旨在跨不同平台和语言实现可移植性的模型,都依赖于描述数据如何以二进制负载序列化的模式。 为了序列化数据然后解释它,发送方和接收方都必须能够访问描述二进制格式的模式。 在某些情况下,模式可以在序列化时从负载类型推断出来,或者在反序列化时从目标类型推断出来。 然而,许多应用程序受益于能够访问描述二进制数据格式的显式模式。 模式注册中心允许您以文本格式(通常是 JSON)存储模式信息,并使该信息可供需要它的各种应用程序访问,以二进制格式接收和发送数据。 模式可以作为由以下部分组成的元组进行引用:
-
主题,即模式的逻辑名称
-
模式版本
-
模式格式,描述数据的二进制格式
Spring Cloud Stream 模式注册中心提供以下组件:
-
独立的模式注册中心服务器默认情况下,它使用 H2 数据库,但通过提供适当的数据源配置,服务器可以与 PostgreSQL 或 MySQL 一起使用。
-
能够通过与模式注册中心通信来编组消息的模式注册中心客户端。目前,客户端可以与独立的模式注册中心或 Confluent 模式注册中心通信。
模式注册中心客户端
与模式注册中心服务器交互的客户端抽象是 SchemaRegistryClient
接口,它具有以下结构:
public interface SchemaRegistryClient {
SchemaRegistrationResponse register(String subject, String format, String schema);
String fetch(SchemaReference schemaReference);
String fetch(Integer id);
}
Spring Cloud Stream 为与其自己的模式服务器交互以及与 Confluent 模式注册中心交互提供了开箱即用的实现。
可以使用 @EnableSchemaRegistryClient
配置 Spring Cloud Stream 模式注册中心的客户端,如下所示:
@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {
}
默认转换器不仅优化了对远程服务器模式的缓存,还优化了 |
模式注册中心客户端属性
模式注册中心客户端支持以下属性:
spring.cloud.stream.schemaRegistryClient.endpoint
-
模式服务器的位置。 设置此项时,请使用完整的 URL,包括协议(
http
或https
)、端口和上下文路径。 - 默认值
spring.cloud.stream.schemaRegistryClient.cached
-
客户端是否应缓存模式服务器响应。 通常设置为
false
,因为缓存发生在消息转换器中。 使用模式注册中心客户端的客户端应将其设置为true
。 - 默认值
-
false
Avro 模式注册中心客户端消息转换器
对于已在应用程序上下文中注册 SchemaRegistryClient
bean 的应用程序,Spring Cloud Stream 会为模式管理自动配置 Apache Avro 消息转换器。
这简化了模式演进,因为接收消息的应用程序可以轻松访问写入器模式,该模式可以与它们自己的读取器模式协调。
对于出站消息,如果绑定的内容类型设置为 application/*+avro
,则 MessageConverter
将被激活,如以下示例所示:
spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro
在出站转换期间,消息转换器尝试推断每个出站消息的模式(基于其类型),并使用 SchemaRegistryClient
将其注册到主题(基于负载类型)。
如果已找到相同的模式,则检索对其的引用。
如果未找到,则注册模式并提供新的版本号。
消息使用以下方案的 contentType
标头发送:application/[prefix].[subject].v[version]+avro
,其中 prefix
是可配置的,subject
是从负载类型推断出来的。
例如,类型为 User
的消息可能作为二进制负载发送,其内容类型为 application/vnd.user.v2+avro
,其中 user
是主题,2
是版本号。
接收消息时,转换器从传入消息的标头推断模式引用并尝试检索它。该模式用作反序列化过程中的写入器模式。
Avro 模式注册中心消息转换器属性
如果您已通过设置 spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro
启用了基于 Avro 的模式注册中心客户端,则可以通过设置以下属性来自定义注册行为。
- spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
-
如果您希望转换器使用反射从 POJO 推断模式,则启用此功能。
默认值:
false
- spring.cloud.stream.schema.avro.readerSchema
-
Avro 通过查看写入器模式(原始负载)和读取器模式(您的应用程序负载)来比较模式版本。有关更多信息,请参阅 Avro 文档。 如果设置,这将覆盖模式服务器上的任何查找,并使用本地模式作为读取器模式。 默认值:
null
- spring.cloud.stream.schema.avro.schemaLocations
-
将此属性中列出的任何
.avsc
文件注册到模式服务器。默认值:
empty
- spring.cloud.stream.schema.avro.prefix
-
内容类型标头中使用的前缀。
默认值:
vnd
- spring.cloud.stream.schema.avro.subjectNamingStrategy
-
确定用于在模式注册中心注册 Avro 模式的主题名称。有两种实现可用,
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
,其中主题是模式名称,以及org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
,它使用 Avro 模式命名空间和名称返回完全限定的主题。可以通过实现org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
来创建自定义策略。默认值:
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
- spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer
-
忽略任何模式注册中心通信。这对于测试目的很有用,这样在运行单元测试时,它就不会不必要地尝试连接到模式注册中心服务器。
默认值:
false
Apache Avro 消息转换器
Spring Cloud Stream 通过其 spring-cloud-stream-schema-registry-client
模块提供对基于模式的消息转换器的支持。
目前,开箱即用的基于模式的消息转换器唯一支持的序列化格式是 Apache Avro,未来版本将添加更多格式。
spring-cloud-stream-schema-registry-client
模块包含两种类型的消息转换器,可用于 Apache Avro 序列化:
-
使用序列化或反序列化对象的类信息或在启动时已知位置的模式的转换器。
-
使用模式注册中心的转换器。它们在运行时定位模式,并随着域对象的演进动态注册新模式。
支持模式的转换器
AvroSchemaMessageConverter
支持通过使用预定义模式或使用类中可用的模式信息(通过反射或包含在 SpecificRecord
中)来序列化和反序列化消息。
如果您提供自定义转换器,则不会创建默认的 AvroSchemaMessageConverter bean。
以下示例显示了一个自定义转换器:
要使用自定义转换器,您可以简单地将其添加到应用程序上下文,可以选择指定一个或多个与其关联的 MimeTypes
。
默认 MimeType
是 application/avro
。
如果转换的目标类型是 GenericRecord
,则必须设置模式。
以下示例显示了如何在接收器应用程序中配置转换器,通过注册不带预定义模式的 Apache Avro MessageConverter
。
在此示例中,请注意 mime 类型值为 avro/bytes
,而不是默认的 application/avro
。
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}
相反,以下应用程序注册了一个带预定义模式(在类路径中找到)的转换器:
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
return converter;
}
}
模式注册中心服务器
Spring Cloud Stream 提供了一个模式注册中心服务器实现。
要使用它,您可以下载最新的 spring-cloud-stream-schema-registry-server
版本并将其作为独立应用程序运行:
wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar
您可以将模式注册中心嵌入到现有的 Spring Boot Web 应用程序中。
为此,请将
|
spring.cloud.stream.schema.server.path
属性可用于控制模式服务器的根路径(尤其是在嵌入到其他应用程序中时)。
spring.cloud.stream.schema.server.allowSchemaDeletion
布尔属性启用模式的删除。默认情况下,此功能是禁用的。
模式注册中心服务器使用关系数据库存储模式。 默认情况下,它使用嵌入式数据库。 您可以通过使用 Spring Boot SQL 数据库和 JDBC 配置选项来自定义模式存储。
模式注册中心服务器 API
模式注册中心服务器 API 包含以下操作:
-
POST /
— 请参阅注册新模式
-
GET /{subject}/{format}/{version}
— 请参阅按主题、格式和版本检索现有模式
-
GET /{subject}/{format}
— 请参阅按主题和格式检索现有模式
-
GET /schemas/{id}
— 请参阅按 ID 检索现有模式
-
DELETE /{subject}/{format}/{version}
— 请参阅按主题、格式和版本删除模式
-
DELETE /schemas/{id}
— 请参阅按 ID 删除模式
-
DELETE /{subject}
— 请参阅按主题删除模式
注册新模式
要注册新模式,请向 /
端点发送 POST
请求。
/
接受一个 JSON 负载,其中包含以下字段:
-
subject
: 模式主题 -
format
: 模式格式 -
definition
: 模式定义
其响应是 JSON 格式的模式对象,包含以下字段:
-
id
: 模式 ID -
subject
: 模式主题 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定义
按主题、格式和版本检索现有模式
要按主题、格式和版本检索现有模式,请向 {subject}/{format}/{version}
端点发送 GET
请求。
其响应是 JSON 格式的模式对象,包含以下字段:
-
id
: 模式 ID -
subject
: 模式主题 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定义
按主题和格式检索现有模式
要按主题和格式检索现有模式,请向 /subject/format
端点发送 GET
请求。
其响应是模式列表,每个模式对象都是 JSON 格式,包含以下字段:
-
id
: 模式 ID -
subject
: 模式主题 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定义
按 ID 检索现有模式
要按其 ID 检索模式,请向 /schemas/{id}
端点发送 GET
请求。
其响应是 JSON 格式的模式对象,包含以下字段:
-
id
: 模式 ID -
subject
: 模式主题 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定义
按主题删除模式
DELETE /{subject}
按主题删除现有模式。
此说明仅适用于 Spring Cloud Stream 1.1.0.RELEASE 的用户。
Spring Cloud Stream 1.1.0.RELEASE 使用表名 |
使用 Confluent 的模式注册中心
默认配置会创建一个 DefaultSchemaRegistryClient
bean。
如果要使用 Confluent 模式注册中心,则需要创建一个 ConfluentSchemaRegistryClient
类型的 bean,它将取代框架默认配置的 bean。以下示例显示了如何创建这样的 bean:
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
ConfluentSchemaRegistryClient 已针对 Confluent 平台版本 4.0.0 进行测试。 |
模式注册过程(序列化)
注册过程的第一部分是从通过通道发送的负载中提取模式。
Avro 类型(如 SpecificRecord
或 GenericRecord
)已经包含一个模式,可以立即从实例中检索。
对于 POJO,如果 spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
属性设置为 true
(默认值),则会推断模式。
一旦获得模式,转换器就会从远程服务器加载其元数据(版本)。 首先,它查询本地缓存。如果未找到结果,则将数据提交到服务器,服务器会回复版本信息。 转换器始终缓存结果,以避免为每个需要序列化的新消息查询模式服务器的开销。
有了模式版本信息,转换器将消息的 contentType
标头设置为携带版本信息——例如:application/vnd.user.v1+avro
。
模式解析过程(反序列化)
当读取包含版本信息的消息时(即,contentType
标头具有 模式注册过程(序列化)
中描述的方案),转换器查询模式服务器以获取消息的写入器模式。
一旦找到传入消息的正确模式,它就会检索读取器模式,并使用 Avro 的模式解析支持将其读入读取器定义(设置默认值和任何缺失的属性)。
您应该了解写入器模式(写入消息的应用程序)和读取器模式(接收应用程序)之间的区别。
我们建议花点时间阅读 Avro 术语并理解该过程。
Spring Cloud Stream 始终获取写入器模式以确定如何读取消息。
如果要使 Avro 的模式演进支持工作,需要确保为应用程序正确设置了 |