Spring Cloud Stream Schema Registry

Introduction

当组织具有基于消息传递的 pub/sub 架构,并且多个生产者和消费者微服务彼此通信时,通常需要所有这些微服务就基于模式的合同达成一致。当此类模式需要进行演变以适应新的业务要求时,仍然需要现有组件继续工作。Spring Cloud Stream 为一个独立的模式注册表服务器提供支持,可以使用该服务器注册上述模式并由应用程序使用。Spring Cloud Stream 模式注册表支持还为基于 Avro 的模式注册表客户端提供支持,这些客户端本质上提供消息转换器,该消息转换器与模式注册表通信以在消息转换期间协调模式。Spring Cloud Stream 提供的模式演变支持既适用于上述独立模式注册表,也适用于专门适用于 Apache Kafka 的 Confluent 提供的模式注册表。

When organizations have a messaging based pub/sub architecture and multiple producer and consumer microservices communicate with each other, it is often necessary for all those microservices to agree on a contract that is based on a schema. When such a schema needs to evolve to accommodate new business requirements, the existing components are still required to continue to work. Spring Cloud Stream provides support for a standalone schema registry server using which aforementioned schema can be registered and used by the applications. Spring Cloud Stream schema registry support also provides support for avro based schema registry clients, which essentially provide message converters that communicates with the schema registry for reconciling schema during message conversion. The schema evolution support provided by Spring Cloud Stream works both with the aforementioned standalone schema registry as well as the schema registry provided by Confluent that specifically works with Apache Kafka.

Spring Cloud Stream Schema Registry overview

Spring Cloud Stream Schema Registry 为模式演变提供支持,以便数据可以随着时间的推移进行演变,并且仍然可以与较早或较新的生产者和消费者结合使用,反之亦然。大多数序列化模型(特别是那些旨在跨不同平台和语言实现可移植性的模型),依赖于描述数据如何在二进制有效负载中序列化的模式。为了序列化数据然后进行解释,发送方和接收方都必须能够访问描述二进制格式的模式。在某些情况下,模式可以从序列化时的有效负载类型或从反序列化时的目标类型推断出来。然而,许多应用程序受益于访问描述二进制数据格式的显式模式。模式注册表允许你将模式信息存储为文本格式(通常是 JSON),并使各种需要它以二进制格式接收和发送数据的应用程序可以访问该信息。模式可以作为以下形式的元组进行引用:

Spring Cloud Stream Schema Registry provides support for schema evolution so that the data can be evolved over time and still work with older or newer producers and consumers and vice versa. Most serialization models, especially the ones that aim for portability across different platforms and languages, rely on a schema that describes how the data is serialized in the binary payload. In order to serialize the data and then to interpret it, both the sending and receiving sides must have access to a schema that describes the binary format. In certain cases, the schema can be inferred from the payload type on serialization or from the target type on deserialization. However, many applications benefit from having access to an explicit schema that describes the binary data format. A schema registry lets you store schema information in a textual format (typically JSON) and makes that information accessible to various applications that need it to receive and send data in binary format. A schema is referenceable as a tuple consisting of:

  • A subject that is the logical name of the schema

  • The schema version

  • The schema format, which describes the binary format of the data

Spring Cloud Stream Schema Registry 提供以下组件:

Spring Cloud Stream Schema Registry provides the following components

  • Standalone Schema Registry ServerBy default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.

  • Schema registry clients capable of message marshalling by communicating with a Schema Registry.Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.

Schema Registry Client

与模式注册表服务器交互的客户端端抽象是 SchemaRegistryClient 接口,它具有以下结构:

The client-side abstraction for interacting with schema registry servers is the SchemaRegistryClient interface, which has the following structure:

public interface SchemaRegistryClient {

    SchemaRegistrationResponse register(String subject, String format, String schema);

    String fetch(SchemaReference schemaReference);

    String fetch(Integer id);

}

Spring Cloud Stream 为与自己的模式服务器交互和与 Confluent Schema Registry 交互提供开箱即用的实现。

Spring Cloud Stream provides out-of-the-box implementations for interacting with its own schema server and for interacting with the Confluent Schema Registry.

可以使用 @EnableSchemaRegistryClient 为 Spring Cloud Stream 模式注册表客户端进行配置,如下所示:

A client for the Spring Cloud Stream schema registry can be configured by using the @EnableSchemaRegistryClient, as follows:

@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {

}

默认转换器经过优化,不仅可以缓存远程服务器的架构,还可以缓存 parse()toString() 方法,这些方法相当昂贵。正因为如此,它使用了一个不缓存响应的 DefaultSchemaRegistryClient 。如果您打算更改默认行为,则可以在代码中直接使用客户端并将其覆盖到所需的输出。为此,您必须将 spring.cloud.stream.schemaRegistryClient.cached=true 属性添加到您的应用程序属性中。

The default converter is optimized to cache not only the schemas from the remote server but also the parse() and toString() methods, which are quite expensive. Because of this, it uses a DefaultSchemaRegistryClient that does not cache responses. If you intend to change the default behavior, you can use the client directly on your code and override it to the desired outcome. To do so, you have to add the property spring.cloud.stream.schemaRegistryClient.cached=true to your application properties.

Schema Registry Client Properties

模式注册表客户端支持以下属性:

The Schema Registry Client supports the following properties:

spring.cloud.stream.schemaRegistryClient.endpoint

The location of the schema-server. When setting this, use a full URL, including protocol (http or https) , port, and context path.

Default

http://localhost:8990/

spring.cloud.stream.schemaRegistryClient.cached

Whether the client should cache schema server responses. Normally set to false, as the caching happens in the message converter. Clients using the schema registry client should set this to true.

Default

false

Avro Schema Registry Client Message Converters

对于在应用程序上下文中注册了 SchemaRegistryClient bean 的应用程序,Spring Cloud Stream 会自动配置一个用于模式管理的 Apache Avro 消息转换器。这简化了模式演变,因为接收消息的应用程序可以轻松获取可以与其自己的读取器模式协调的写入器模式。

For applications that have a SchemaRegistryClient bean registered with the application context, Spring Cloud Stream auto-configures an Apache Avro message converter for schema management. This eases schema evolution, as applications that receive messages can get easy access to a writer schema that can be reconciled with their own reader schema.

对于出站消息,如果绑定的内容类型设置为 application/*+avro,则会激活 MessageConverter,如下面的示例所示:

For outbound messages, if the content type of the binding is set to application/*+avro, the MessageConverter is activated, as shown in the following example:

spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro

在出站转换期间,消息转换器会尝试推断每个出站消息的模式(基于其类型),并使用 SchemaRegistryClient 根据主题类型将其注册到一个主题。如果已找到相同的模式,那么会检索它的一个引用。如果没有,则会注册模式,并提供一个新的版本号。消息会使用以下格式通过 contentType 标题发送: application/[prefix].[subject].v[version]+avro,其中 prefix 是可配置的,而 subject 根据主题类型推断出来。

During the outbound conversion, the message converter tries to infer the schema of each outbound messages (based on its type) and register it to a subject (based on the payload type) by using the SchemaRegistryClient. If an identical schema is already found, then a reference to it is retrieved. If not, the schema is registered, and a new version number is provided. The message is sent with a contentType header by using the following scheme: application/[prefix].[subject].v[version]+avro, where prefix is configurable and subject is deduced from the payload type.

例如,类型为 User 的消息可以作为二进制有效负载发送,其内容类型为 application/vnd.user.v2+avro,其中 user 是主题,2 是版本号。

For example, a message of the type User might be sent as a binary payload with a content type of application/vnd.user.v2+avro, where user is the subject and 2 is the version number.

在接收消息时,转换器从传入消息的标头推断模式引用并尝试检索它。该模式在反序列化过程中用作写入器模式。

When receiving messages, the converter infers the schema reference from the header of the incoming message and tries to retrieve it. The schema is used as the writer schema in the deserialization process.

Avro Schema Registry Message Converter Properties

如果你通过设置 spring.cloud.stream.stream.bindings.<输出绑定名称>.contentType=application/*+avro 启用了基于 Avro 的架构注册表客户端,你可以通过设置下列属性来自定义注册行为。

If you have enabled Avro based schema registry client by setting spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro, you can customize the behavior of the registration by setting the following properties.

spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled

Enable if you want the converter to use reflection to infer a Schema from a POJO.

默认值:false

Default: false

spring.cloud.stream.schema.avro.readerSchema

Avro compares schema versions by looking at a writer schema (origin payload) and a reader schema (your application payload). See the Avro documentation for more information. If set, this overrides any lookups at the schema server and uses the local schema as the reader schema. Default: null

spring.cloud.stream.schema.avro.schemaLocations

Registers any .avsc files listed in this property with the Schema Server.

默认值:empty

Default: empty

spring.cloud.stream.schema.avro.prefix

The prefix to be used on the Content-Type header.

默认值:vnd

Default: vnd

spring.cloud.stream.schema.avro.subjectNamingStrategy

Determines the subject name used to register the Avro schema in the schema registry. Two implementations are available, org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy, where the subject is the schema name, and org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy, which returns a fully qualified subject using the Avro schema namespace and name. Custom strategies can be created by implementing org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy.

默认值:org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy

Default: org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy

spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer

Ignore any schema registry communication. Useful for testing purposes so that when running a unit test, it does not unnecessarily try to connect to a Schema Registry server.

默认值:false

Default: false

Apache Avro Message Converters

Spring Cloud Stream 通过其 spring-cloud-stream-schema-registry-client 模块提供对基于架构的消息转换器的支持。当前,基于架构的消息转换器唯一支持的序列化格式是 Apache Avro,未来版本中会增加更多格式。

Spring Cloud Stream provides support for schema-based message converters through its spring-cloud-stream-schema-registry-client module. Currently, the only serialization format supported out of the box for schema-based message converters is Apache Avro, with more formats to be added in future versions.

spring-cloud-stream-schema-registry-client 模块包含两种可用于 Apache Avro 序列化的消息转换器类型:

The spring-cloud-stream-schema-registry-client module contains two types of message converters that can be used for Apache Avro serialization:

  • Converters that use the class information of the serialized or deserialized objects or a schema with a location known at startup.

  • Converters that use a schema registry. They locate the schemas at runtime and dynamically register new schemas as domain objects evolve.

Converters with Schema Support

AvroSchemaMessageConverter 支持通过使用预定义的架构或使用类中可用的架构信息(反射方式或包含在 SpecificRecord 中)对消息进行序列化和反序列化。如果你提供自定义转换器,那么默认的 AvroSchemaMessageConverter bean 不会创建。以下示例展示了自定义转换器:

The AvroSchemaMessageConverter supports serializing and deserializing messages either by using a predefined schema or by using the schema information available in the class (either reflectively or contained in the SpecificRecord). If you provide a custom converter, then the default AvroSchemaMessageConverter bean is not created. The following example shows a custom converter:

要使用自定义转换器,你可以简单地将它添加到应用程序上下文中,也可以选择指定一个或多个与其关联的 MimeTypes。默认的 MimeTypeapplication/avro

To use custom converters, you can simply add it to the application context, optionally specifying one or more MimeTypes with which to associate it. The default MimeType is application/avro.

如果转换的目标类型是 GenericRecord,则必须设置架构。

If the target type of the conversion is a GenericRecord, a schema must be set.

以下示例展示了如何在接收器应用程序中配置转换器,方法是在没有预定义架构的情况下注册 Apache Avro MessageConverter。在此示例中,请注意 MIME 类型值是 avro/bytes,而不是默认的 application/avro

The following example shows how to configure a converter in a sink application by registering the Apache Avro MessageConverter without a predefined schema. In this example, note that the mime type value is avro/bytes, not the default application/avro.

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
  }
}

相反,以下应用程序注册了具有预定义架构(在类路径中找到)的转换器:

Conversely, the following application registers a converter with a predefined schema (found on the classpath):

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
      converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
      return converter;
  }
}

Schema Registry Server

Spring Cloud Stream 提供了架构注册表服务器实现。要使用它,你可以下载最新的 spring-cloud-stream-schema-registry-server 版本并将其作为独立应用程序运行:

Spring Cloud Stream provides a schema registry server implementation. To use it, you can download latest spring-cloud-stream-schema-registry-server release and run it as standalone application:

wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar

你可以在现有 Spring Boot Web 应用程序中嵌入架构注册表。要实现这一点,向项目添加 spring-cloud-stream-schema-registry-core 工件并使用 @EnableSchemaRegistryServer 注释,它将架构注册表服务器 REST 控制器添加到你的应用程序。以下示例展示了一个启用了架构注册表的 Spring Boot 应用程序:

You can embed the schema registry in your existing Spring Boot web application. To do this, add the spring-cloud-stream-schema-registry-core artifact to your project and use the @EnableSchemaRegistryServer annotation, which adds the schema registry server REST controller to your application. The following example shows a Spring Boot application that enables the schema registry:

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
public static void main(String[] args) {
SpringApplication.run(SchemaRegistryServerApplication.class, args);
}
}

spring.cloud.stream.schema.server.path 属性可用于控制架构服务器的根路径(特别当它嵌入在其他应用程序中时)。布尔属性 spring.cloud.stream.schema.server.allowSchemaDeletion 启用架构删除。默认情况下,此功能处于禁用状态。

The spring.cloud.stream.schema.server.path property can be used to control the root path of the schema server (especially when it is embedded in other applications). The spring.cloud.stream.schema.server.allowSchemaDeletion boolean property enables the deletion of a schema. By default, this is disabled.

模式注册表服务器使用关系数据库存储模式。默认情况下,它使用嵌入式数据库。您可以使用 Spring Boot SQL database and JDBC configuration options 来定制模式存储。

The schema registry server uses a relational database to store the schemas. By default, it uses an embedded database. You can customize the schema storage by using the Spring Boot SQL database and JDBC configuration options.

Schema Registry Server API

架构注册表服务器 API 包含以下操作:

The Schema Registry Server API consists of the following operations:

Registering a New Schema

要注册新架构,向 / 端点发送 POST 请求。

To register a new schema, send a POST request to the / endpoint.

/ 接收具有以下字段的 JSON 有效负载:

The / accepts a JSON payload with the following fields:

  • subject: The schema subject

  • format: The schema format

  • definition: The schema definition

其响应是一个具有以下字段的 JSON 中的架构对象:

Its response is a schema object in JSON, with the following fields:

  • id: The schema ID

  • subject: The schema subject

  • format: The schema format

  • version: The schema version

  • definition: The schema definition

Retrieving an Existing Schema by Subject, Format, and Version

要按主题、格式和版本检索现有的模式,请向“{subject}/{format}/{version}”端点发送“GET”请求。

To retrieve an existing schema by subject, format, and version, send GET request to the {subject}/{format}/{version} endpoint.

其响应是一个具有以下字段的 JSON 中的架构对象:

Its response is a schema object in JSON, with the following fields:

  • id: The schema ID

  • subject: The schema subject

  • format: The schema format

  • version: The schema version

  • definition: The schema definition

Retrieving an Existing Schema by Subject and Format

要按主题和格式检索现有的模式,请向“/subject/format”端点发送“GET”请求。

To retrieve an existing schema by subject and format, send a GET request to the /subject/format endpoint.

它的响应是一个包含每个模式对象的 JSON 模式列表,其中包含以下字段:

Its response is a list of schemas with each schema object in JSON, with the following fields:

  • id: The schema ID

  • subject: The schema subject

  • format: The schema format

  • version: The schema version

  • definition: The schema definition

Retrieving an Existing Schema by ID

要按其 ID 检索模式,请向“/schemas/{id}”端点发送“GET”请求。

To retrieve a schema by its ID, send a GET request to the /schemas/{id} endpoint.

其响应是一个具有以下字段的 JSON 中的架构对象:

Its response is a schema object in JSON, with the following fields:

  • id: The schema ID

  • subject: The schema subject

  • format: The schema format

  • version: The schema version

  • definition: The schema definition

Deleting a Schema by Subject, Format, and Version

要删除由其主题、格式和版本标识的模式,请向“{subject}/{format}/{version}”端点发送“DELETE”请求。

To delete a schema identified by its subject, format, and version, send a DELETE request to the {subject}/{format}/{version} endpoint.

Deleting a Schema by ID

要按其 ID 删除模式,请向“/schemas/{id}”端点发送“DELETE”请求。

To delete a schema by its ID, send a DELETE request to the /schemas/{id} endpoint.

Deleting a Schema by Subject

“DELETE /{subject}”

DELETE /{subject}

按主题删除现有模式。

Delete existing schemas by their subject.

此备注仅适用于 Spring Cloud Stream 1.1.0.RELEASE 用户。Spring Cloud Stream 1.1.0.RELEASE 用于存储 Schema 对象的表名称为 schemaSchema 是多个数据库实现中的关键字。为了避免将来出现任何冲突,从 1.1.1.RELEASE 开始,我们选择将存储表命名为 SCHEMA_REPOSITORY。所有升级 Spring Cloud Stream 1.1.0.RELEASE 的用户都应在升级之前将现有架构迁移到新表。

This note applies to users of Spring Cloud Stream 1.1.0.RELEASE only. Spring Cloud Stream 1.1.0.RELEASE used the table name, schema, for storing Schema objects. Schema is a keyword in a number of database implementations. To avoid any conflicts in the future, starting with 1.1.1.RELEASE, we have opted for the name SCHEMA_REPOSITORY for the storage table. Any Spring Cloud Stream 1.1.0.RELEASE users who upgrade should migrate their existing schemas to the new table before upgrading.

Using Confluent’s Schema Registry

默认配置创建一个 DefaultSchemaRegistryClient bean。如果你要使用 Confluent Schema 注册表,你需要创建一个 ConfluentSchemaRegistryClient 类型的 bean,该 bean 替代了框架默认配置的 bean。以下示例演示了如何创建一个这样的 bean:

The default configuration creates a DefaultSchemaRegistryClient bean. If you want to use the Confluent schema registry, you need to create a bean of type ConfluentSchemaRegistryClient, which supersedes the one configured by default by the framework. The following example shows how to create such a bean:

@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
  ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
  client.setEndpoint(endpoint);
  return client;
}

ConfluentSchemaRegistryClient 已针对 Confluent 平台版本 4.0.0 进行了测试。

The ConfluentSchemaRegistryClient is tested against Confluent platform version 4.0.0.

Schema Registration and Resolution

为了更好地理解 Spring Cloud Stream 如何注册和解析新模式及其如何使用 Avro 模式比较功能,我们提供了两个单独的小节:

To better understand how Spring Cloud Stream registers and resolves new schemas and its use of Avro schema comparison features, we provide two separate subsections:

Schema Registration Process (Serialization)

注册过程的第一部分是从通过通道发送的有效负载中提取模式。Avro 类型(如 SpecificRecordGenericRecord)已经包含一个模式,可以立即从实例中检索该模式。对于 POJO,如果将 spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled 属性设置为“true”(默认),则会推断出一个模式。

The first part of the registration process is extracting a schema from the payload that is being sent over a channel. Avro types such as SpecificRecord or GenericRecord already contain a schema, which can be retrieved immediately from the instance. In the case of POJOs, a schema is inferred if the spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled property is set to true (the default).

一旦获取到模式,转换器会从远程服务器加载其元数据(版本)。首先,它查询本地缓存。如果没有找到结果,它会将数据提交到服务器,服务器将回复版本信息。转换器始终会缓存结果,以避免为需要序列化的每条新消息查询模式服务器而产生的开销。

Ones a schema is obtained, the converter loads its metadata (version) from the remote server. First, it queries a local cache. If no result is found, it submits the data to the server, which replies with versioning information. The converter always caches the results to avoid the overhead of querying the Schema Server for every new message that needs to be serialized.

有了模式版本信息,转换器就会把消息的 contentType 标题设置为携带版本信息,例如:application/vnd.user.v1+avro

With the schema version information, the converter sets the contentType header of the message to carry the version information — for example: application/vnd.user.v1+avro.

Schema Resolution Process (Deserialization)

在读取包含版本信息(即带有类似 Schema Registration Process (Serialization) 中描述的模式的 contentType 标头)的消息时,转换器会查询模式服务器以获取消息的编写者模式。一旦找到了传入消息的正确模式,它就会检索读取者模式,并通过使用 Avro 的模式解析支持,将其读入读者定义(设置默认值和所有缺失属性)。

When reading messages that contain version information (that is, a contentType header with a scheme like the one described under Schema Registration Process (Serialization), the converter queries the Schema server to fetch the writer schema of the message. Once it has found the correct schema of the incoming message, it retrieves the reader schema and, by using Avro’s schema resolution support, reads it into the reader definition (setting defaults and any missing properties).

你应该了解写入器模式(撰写消息的应用程序)和读取器模式(接收应用程序)之间的区别。我们建议花点时间阅读 the Avro terminology 并了解此过程。Spring Cloud Stream 始终获取写入器模式以确定如何读取消息。如果要让 Avro 的模式演进支持发挥作用,则需要确保为应用程序正确设置了 readerSchema

You should understand the difference between a writer schema (the application that wrote the message) and a reader schema (the receiving application). We suggest taking a moment to read the Avro terminology and understand the process. Spring Cloud Stream always fetches the writer schema to determine how to read a message. If you want to get Avro’s schema evolution support working, you need to make sure that a readerSchema was properly set for your application.