Spring Cloud Stream Schema Registry
Introduction
当组织具有基于消息传递的 pub/sub 架构,并且多个生产者和消费者微服务彼此通信时,通常需要所有这些微服务就基于模式的合同达成一致。当此类模式需要进行演变以适应新的业务要求时,仍然需要现有组件继续工作。Spring Cloud Stream 为一个独立的模式注册表服务器提供支持,可以使用该服务器注册上述模式并由应用程序使用。Spring Cloud Stream 模式注册表支持还为基于 Avro 的模式注册表客户端提供支持,这些客户端本质上提供消息转换器,该消息转换器与模式注册表通信以在消息转换期间协调模式。Spring Cloud Stream 提供的模式演变支持既适用于上述独立模式注册表,也适用于专门适用于 Apache Kafka 的 Confluent 提供的模式注册表。
When organizations have a messaging based pub/sub architecture and multiple producer and consumer microservices communicate with each other, it is often necessary for all those microservices to agree on a contract that is based on a schema. When such a schema needs to evolve to accommodate new business requirements, the existing components are still required to continue to work. Spring Cloud Stream provides support for a standalone schema registry server using which aforementioned schema can be registered and used by the applications. Spring Cloud Stream schema registry support also provides support for avro based schema registry clients, which essentially provide message converters that communicates with the schema registry for reconciling schema during message conversion. The schema evolution support provided by Spring Cloud Stream works both with the aforementioned standalone schema registry as well as the schema registry provided by Confluent that specifically works with Apache Kafka.
Spring Cloud Stream Schema Registry overview
Spring Cloud Stream Schema Registry 为模式演变提供支持,以便数据可以随着时间的推移进行演变,并且仍然可以与较早或较新的生产者和消费者结合使用,反之亦然。大多数序列化模型(特别是那些旨在跨不同平台和语言实现可移植性的模型),依赖于描述数据如何在二进制有效负载中序列化的模式。为了序列化数据然后进行解释,发送方和接收方都必须能够访问描述二进制格式的模式。在某些情况下,模式可以从序列化时的有效负载类型或从反序列化时的目标类型推断出来。然而,许多应用程序受益于访问描述二进制数据格式的显式模式。模式注册表允许你将模式信息存储为文本格式(通常是 JSON),并使各种需要它以二进制格式接收和发送数据的应用程序可以访问该信息。模式可以作为以下形式的元组进行引用:
Spring Cloud Stream Schema Registry provides support for schema evolution so that the data can be evolved over time and still work with older or newer producers and consumers and vice versa. Most serialization models, especially the ones that aim for portability across different platforms and languages, rely on a schema that describes how the data is serialized in the binary payload. In order to serialize the data and then to interpret it, both the sending and receiving sides must have access to a schema that describes the binary format. In certain cases, the schema can be inferred from the payload type on serialization or from the target type on deserialization. However, many applications benefit from having access to an explicit schema that describes the binary data format. A schema registry lets you store schema information in a textual format (typically JSON) and makes that information accessible to various applications that need it to receive and send data in binary format. A schema is referenceable as a tuple consisting of:
-
A subject that is the logical name of the schema
-
The schema version
-
The schema format, which describes the binary format of the data
Spring Cloud Stream Schema Registry 提供以下组件:
Spring Cloud Stream Schema Registry provides the following components
-
Standalone Schema Registry ServerBy default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.
-
Schema registry clients capable of message marshalling by communicating with a Schema Registry.Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.
Schema Registry Client
与模式注册表服务器交互的客户端端抽象是 SchemaRegistryClient
接口,它具有以下结构:
The client-side abstraction for interacting with schema registry servers is the SchemaRegistryClient
interface, which has the following structure:
public interface SchemaRegistryClient {
SchemaRegistrationResponse register(String subject, String format, String schema);
String fetch(SchemaReference schemaReference);
String fetch(Integer id);
}
Spring Cloud Stream 为与自己的模式服务器交互和与 Confluent Schema Registry 交互提供开箱即用的实现。
Spring Cloud Stream provides out-of-the-box implementations for interacting with its own schema server and for interacting with the Confluent Schema Registry.
可以使用 @EnableSchemaRegistryClient
为 Spring Cloud Stream 模式注册表客户端进行配置,如下所示:
A client for the Spring Cloud Stream schema registry can be configured by using the @EnableSchemaRegistryClient
, as follows:
@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {
}
默认转换器经过优化,不仅可以缓存远程服务器的架构,还可以缓存 |
The default converter is optimized to cache not only the schemas from the remote server but also the |
Schema Registry Client Properties
模式注册表客户端支持以下属性:
The Schema Registry Client supports the following properties:
spring.cloud.stream.schemaRegistryClient.endpoint
-
The location of the schema-server. When setting this, use a full URL, including protocol (
http
orhttps
) , port, and context path. - Default
spring.cloud.stream.schemaRegistryClient.cached
-
Whether the client should cache schema server responses. Normally set to
false
, as the caching happens in the message converter. Clients using the schema registry client should set this totrue
. - Default
-
false
Avro Schema Registry Client Message Converters
对于在应用程序上下文中注册了 SchemaRegistryClient bean 的应用程序,Spring Cloud Stream 会自动配置一个用于模式管理的 Apache Avro 消息转换器。这简化了模式演变,因为接收消息的应用程序可以轻松获取可以与其自己的读取器模式协调的写入器模式。
For applications that have a SchemaRegistryClient bean registered with the application context, Spring Cloud Stream auto-configures an Apache Avro message converter for schema management. This eases schema evolution, as applications that receive messages can get easy access to a writer schema that can be reconciled with their own reader schema.
对于出站消息,如果绑定的内容类型设置为 application/*+avro
,则会激活 MessageConverter
,如下面的示例所示:
For outbound messages, if the content type of the binding is set to application/*+avro
, the MessageConverter
is activated, as shown in the following example:
spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro
在出站转换期间,消息转换器会尝试推断每个出站消息的模式(基于其类型),并使用 SchemaRegistryClient
根据主题类型将其注册到一个主题。如果已找到相同的模式,那么会检索它的一个引用。如果没有,则会注册模式,并提供一个新的版本号。消息会使用以下格式通过 contentType
标题发送: application/[prefix].[subject].v[version]+avro
,其中 prefix
是可配置的,而 subject
根据主题类型推断出来。
During the outbound conversion, the message converter tries to infer the schema of each outbound messages (based on its type) and register it to a subject (based on the payload type) by using the SchemaRegistryClient
.
If an identical schema is already found, then a reference to it is retrieved.
If not, the schema is registered, and a new version number is provided.
The message is sent with a contentType
header by using the following scheme: application/[prefix].[subject].v[version]+avro
, where prefix
is configurable and subject
is deduced from the payload type.
例如,类型为 User
的消息可以作为二进制有效负载发送,其内容类型为 application/vnd.user.v2+avro
,其中 user
是主题,2
是版本号。
For example, a message of the type User
might be sent as a binary payload with a content type of application/vnd.user.v2+avro
, where user
is the subject and 2
is the version number.
在接收消息时,转换器从传入消息的标头推断模式引用并尝试检索它。该模式在反序列化过程中用作写入器模式。
When receiving messages, the converter infers the schema reference from the header of the incoming message and tries to retrieve it. The schema is used as the writer schema in the deserialization process.
Avro Schema Registry Message Converter Properties
如果你通过设置 spring.cloud.stream.stream.bindings.<输出绑定名称>.contentType=application/*+avro
启用了基于 Avro 的架构注册表客户端,你可以通过设置下列属性来自定义注册行为。
If you have enabled Avro based schema registry client by setting spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro
, you can customize the behavior of the registration by setting the following properties.
- spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
-
Enable if you want the converter to use reflection to infer a Schema from a POJO.
默认值:
false
Default: false
- spring.cloud.stream.schema.avro.readerSchema
-
Avro compares schema versions by looking at a writer schema (origin payload) and a reader schema (your application payload). See the Avro documentation for more information. If set, this overrides any lookups at the schema server and uses the local schema as the reader schema. Default:
null
- spring.cloud.stream.schema.avro.schemaLocations
-
Registers any
.avsc
files listed in this property with the Schema Server.默认值:
empty
Default: empty
- spring.cloud.stream.schema.avro.prefix
-
The prefix to be used on the Content-Type header.
默认值:
vnd
Default: vnd
- spring.cloud.stream.schema.avro.subjectNamingStrategy
-
Determines the subject name used to register the Avro schema in the schema registry. Two implementations are available,
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
, where the subject is the schema name, andorg.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
, which returns a fully qualified subject using the Avro schema namespace and name. Custom strategies can be created by implementingorg.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
.默认值:
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
Default: org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
- spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer
-
Ignore any schema registry communication. Useful for testing purposes so that when running a unit test, it does not unnecessarily try to connect to a Schema Registry server.
默认值:
false
Default: false
Apache Avro Message Converters
Spring Cloud Stream 通过其 spring-cloud-stream-schema-registry-client
模块提供对基于架构的消息转换器的支持。当前,基于架构的消息转换器唯一支持的序列化格式是 Apache Avro,未来版本中会增加更多格式。
Spring Cloud Stream provides support for schema-based message converters through its spring-cloud-stream-schema-registry-client
module.
Currently, the only serialization format supported out of the box for schema-based message converters is Apache Avro, with more formats to be added in future versions.
spring-cloud-stream-schema-registry-client
模块包含两种可用于 Apache Avro 序列化的消息转换器类型:
The spring-cloud-stream-schema-registry-client
module contains two types of message converters that can be used for Apache Avro serialization:
-
Converters that use the class information of the serialized or deserialized objects or a schema with a location known at startup.
-
Converters that use a schema registry. They locate the schemas at runtime and dynamically register new schemas as domain objects evolve.
Converters with Schema Support
AvroSchemaMessageConverter
支持通过使用预定义的架构或使用类中可用的架构信息(反射方式或包含在 SpecificRecord
中)对消息进行序列化和反序列化。如果你提供自定义转换器,那么默认的 AvroSchemaMessageConverter bean 不会创建。以下示例展示了自定义转换器:
The AvroSchemaMessageConverter
supports serializing and deserializing messages either by using a predefined schema or by using the schema information available in the class (either reflectively or contained in the SpecificRecord
).
If you provide a custom converter, then the default AvroSchemaMessageConverter bean is not created.
The following example shows a custom converter:
要使用自定义转换器,你可以简单地将它添加到应用程序上下文中,也可以选择指定一个或多个与其关联的 MimeTypes
。默认的 MimeType
是 application/avro
。
To use custom converters, you can simply add it to the application context, optionally specifying one or more MimeTypes
with which to associate it.
The default MimeType
is application/avro
.
如果转换的目标类型是 GenericRecord
,则必须设置架构。
If the target type of the conversion is a GenericRecord
, a schema must be set.
以下示例展示了如何在接收器应用程序中配置转换器,方法是在没有预定义架构的情况下注册 Apache Avro MessageConverter
。在此示例中,请注意 MIME 类型值是 avro/bytes
,而不是默认的 application/avro
。
The following example shows how to configure a converter in a sink application by registering the Apache Avro MessageConverter
without a predefined schema.
In this example, note that the mime type value is avro/bytes
, not the default application/avro
.
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}
相反,以下应用程序注册了具有预定义架构(在类路径中找到)的转换器:
Conversely, the following application registers a converter with a predefined schema (found on the classpath):
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
return converter;
}
}
Schema Registry Server
Spring Cloud Stream 提供了架构注册表服务器实现。要使用它,你可以下载最新的 spring-cloud-stream-schema-registry-server
版本并将其作为独立应用程序运行:
Spring Cloud Stream provides a schema registry server implementation.
To use it, you can download latest spring-cloud-stream-schema-registry-server
release and run it as standalone application:
wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar
你可以在现有 Spring Boot Web 应用程序中嵌入架构注册表。要实现这一点,向项目添加 You can embed the schema registry in your existing Spring Boot web application.
To do this, add the
|
spring.cloud.stream.schema.server.path
属性可用于控制架构服务器的根路径(特别当它嵌入在其他应用程序中时)。布尔属性 spring.cloud.stream.schema.server.allowSchemaDeletion
启用架构删除。默认情况下,此功能处于禁用状态。
The spring.cloud.stream.schema.server.path
property can be used to control the root path of the schema server (especially when it is embedded in other applications).
The spring.cloud.stream.schema.server.allowSchemaDeletion
boolean property enables the deletion of a schema. By default, this is disabled.
模式注册表服务器使用关系数据库存储模式。默认情况下,它使用嵌入式数据库。您可以使用 Spring Boot SQL database and JDBC configuration options 来定制模式存储。
The schema registry server uses a relational database to store the schemas. By default, it uses an embedded database. You can customize the schema storage by using the Spring Boot SQL database and JDBC configuration options.
Schema Registry Server API
架构注册表服务器 API 包含以下操作:
The Schema Registry Server API consists of the following operations:
-
POST /
— seeRegistering a New Schema
-
GET /{subject}/{format}/{version}
— seeRetrieving an Existing Schema by Subject, Format, and Version
-
GET /{subject}/{format}
— seeRetrieving an Existing Schema by Subject and Format
-
GET /schemas/{id}
— seeRetrieving an Existing Schema by ID
-
DELETE /{subject}/{format}/{version}
— seeDeleting a Schema by Subject, Format, and Version
-
DELETE /schemas/{id}
— seeDeleting a Schema by ID
-
DELETE /{subject}
— seeDeleting a Schema by Subject
Registering a New Schema
要注册新架构,向 /
端点发送 POST
请求。
To register a new schema, send a POST
request to the /
endpoint.
/
接收具有以下字段的 JSON 有效负载:
The /
accepts a JSON payload with the following fields:
-
subject
: The schema subject -
format
: The schema format -
definition
: The schema definition
其响应是一个具有以下字段的 JSON 中的架构对象:
Its response is a schema object in JSON, with the following fields:
-
id
: The schema ID -
subject
: The schema subject -
format
: The schema format -
version
: The schema version -
definition
: The schema definition
Retrieving an Existing Schema by Subject, Format, and Version
要按主题、格式和版本检索现有的模式,请向“{subject}/{format}/{version}”端点发送“GET”请求。
To retrieve an existing schema by subject, format, and version, send GET
request to the {subject}/{format}/{version}
endpoint.
其响应是一个具有以下字段的 JSON 中的架构对象:
Its response is a schema object in JSON, with the following fields:
-
id
: The schema ID -
subject
: The schema subject -
format
: The schema format -
version
: The schema version -
definition
: The schema definition
Retrieving an Existing Schema by Subject and Format
要按主题和格式检索现有的模式,请向“/subject/format”端点发送“GET”请求。
To retrieve an existing schema by subject and format, send a GET
request to the /subject/format
endpoint.
它的响应是一个包含每个模式对象的 JSON 模式列表,其中包含以下字段:
Its response is a list of schemas with each schema object in JSON, with the following fields:
-
id
: The schema ID -
subject
: The schema subject -
format
: The schema format -
version
: The schema version -
definition
: The schema definition
Retrieving an Existing Schema by ID
要按其 ID 检索模式,请向“/schemas/{id}”端点发送“GET”请求。
To retrieve a schema by its ID, send a GET
request to the /schemas/{id}
endpoint.
其响应是一个具有以下字段的 JSON 中的架构对象:
Its response is a schema object in JSON, with the following fields:
-
id
: The schema ID -
subject
: The schema subject -
format
: The schema format -
version
: The schema version -
definition
: The schema definition
Deleting a Schema by Subject, Format, and Version
要删除由其主题、格式和版本标识的模式,请向“{subject}/{format}/{version}”端点发送“DELETE”请求。
To delete a schema identified by its subject, format, and version, send a DELETE
request to the {subject}/{format}/{version}
endpoint.
Deleting a Schema by ID
要按其 ID 删除模式,请向“/schemas/{id}”端点发送“DELETE”请求。
To delete a schema by its ID, send a DELETE
request to the /schemas/{id}
endpoint.
Deleting a Schema by Subject
“DELETE /{subject}”
DELETE /{subject}
按主题删除现有模式。
Delete existing schemas by their subject.
此备注仅适用于 Spring Cloud Stream 1.1.0.RELEASE 用户。Spring Cloud Stream 1.1.0.RELEASE 用于存储 |
This note applies to users of Spring Cloud Stream 1.1.0.RELEASE only.
Spring Cloud Stream 1.1.0.RELEASE used the table name, |
Using Confluent’s Schema Registry
默认配置创建一个 DefaultSchemaRegistryClient
bean。如果你要使用 Confluent Schema 注册表,你需要创建一个 ConfluentSchemaRegistryClient
类型的 bean,该 bean 替代了框架默认配置的 bean。以下示例演示了如何创建一个这样的 bean:
The default configuration creates a DefaultSchemaRegistryClient
bean.
If you want to use the Confluent schema registry, you need to create a bean of type ConfluentSchemaRegistryClient
, which supersedes the one configured by default by the framework. The following example shows how to create such a bean:
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
ConfluentSchemaRegistryClient 已针对 Confluent 平台版本 4.0.0 进行了测试。 |
The ConfluentSchemaRegistryClient is tested against Confluent platform version 4.0.0. |
Schema Registration and Resolution
为了更好地理解 Spring Cloud Stream 如何注册和解析新模式及其如何使用 Avro 模式比较功能,我们提供了两个单独的小节:
To better understand how Spring Cloud Stream registers and resolves new schemas and its use of Avro schema comparison features, we provide two separate subsections:
Schema Registration Process (Serialization)
注册过程的第一部分是从通过通道发送的有效负载中提取模式。Avro 类型(如 SpecificRecord
或 GenericRecord
)已经包含一个模式,可以立即从实例中检索该模式。对于 POJO,如果将 spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
属性设置为“true”(默认),则会推断出一个模式。
The first part of the registration process is extracting a schema from the payload that is being sent over a channel.
Avro types such as SpecificRecord
or GenericRecord
already contain a schema, which can be retrieved immediately from the instance.
In the case of POJOs, a schema is inferred if the spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
property is set to true
(the default).
一旦获取到模式,转换器会从远程服务器加载其元数据(版本)。首先,它查询本地缓存。如果没有找到结果,它会将数据提交到服务器,服务器将回复版本信息。转换器始终会缓存结果,以避免为需要序列化的每条新消息查询模式服务器而产生的开销。
Ones a schema is obtained, the converter loads its metadata (version) from the remote server. First, it queries a local cache. If no result is found, it submits the data to the server, which replies with versioning information. The converter always caches the results to avoid the overhead of querying the Schema Server for every new message that needs to be serialized.
有了模式版本信息,转换器就会把消息的 contentType
标题设置为携带版本信息,例如:application/vnd.user.v1+avro
。
With the schema version information, the converter sets the contentType
header of the message to carry the version information — for example: application/vnd.user.v1+avro
.
Schema Resolution Process (Deserialization)
在读取包含版本信息(即带有类似 Schema Registration Process (Serialization)
中描述的模式的 contentType
标头)的消息时,转换器会查询模式服务器以获取消息的编写者模式。一旦找到了传入消息的正确模式,它就会检索读取者模式,并通过使用 Avro 的模式解析支持,将其读入读者定义(设置默认值和所有缺失属性)。
When reading messages that contain version information (that is, a contentType
header with a scheme like the one described under Schema Registration Process (Serialization)
, the converter queries the Schema server to fetch the writer schema of the message.
Once it has found the correct schema of the incoming message, it retrieves the reader schema and, by using Avro’s schema resolution support, reads it into the reader definition (setting defaults and any missing properties).
你应该了解写入器模式(撰写消息的应用程序)和读取器模式(接收应用程序)之间的区别。我们建议花点时间阅读 the Avro terminology 并了解此过程。Spring Cloud Stream 始终获取写入器模式以确定如何读取消息。如果要让 Avro 的模式演进支持发挥作用,则需要确保为应用程序正确设置了 |
You should understand the difference between a writer schema (the application that wrote the message) and a reader schema (the receiving application).
We suggest taking a moment to read the Avro terminology and understand the process.
Spring Cloud Stream always fetches the writer schema to determine how to read a message.
If you want to get Avro’s schema evolution support working, you need to make sure that a |