Debezium Support

Debezium Engine,变更数据捕获 (CDC) 入站通道适配器。DebeziumMessageProducer 允许捕获数据库变更事件,将它们转换为消息,然后流传输到出站通道。

Debezium Engine, Change Data Capture (CDC) inbound channel adapter. The DebeziumMessageProducer allows capturing database change events, converting them into messages and streaming later to the outbound channels.

您需要将 Spring Integration Debezium 依赖项包含到您的项目:

You need to include the spring integration Debezium dependency to your project:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-debezium</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:{project-version}"

你还需要为输入数据库包含 debezium connector 依赖项。例如,要将 Debezium 与 PostgreSQL 配合使用,你需要 postgres debezium 连接器:

You also need to include a debezium connector dependency for your input Database. For example to use Debezium with PostgreSQL you will need the postgres debezium connector:

  • Maven

  • Gradle

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-postgres</artifactId>
    <version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"

用与正在使用的 spring-integration-debezium 版本兼容的版本替换 debezium-version

Replace the debezium-version with the version compatible with the spring-integration-debezium version being used.

Inbound Debezium Channel Adapter

Debezium 适配器需要预先配置好的 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> 实例。

The Debezium adapter expects a pre-configured DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> instance.

debezium-supplier 提供了一个开箱即用的 DebeziumEngine.Builder Spring Boot 自动配置,以及一个方便的 DebeziumProperties 配置抽象。

The debezium-supplier provides an out of the box DebeziumEngine.Builder Spring Boot auto-configuration with a handy DebeziumProperties configuration abstraction.

Debezium Java DSL 可以从提供的 DebeziumEngine.Builder 创建一个 DebeziumMessageProducer 实例,也可以从普通的 Debezium 配置(例如 java.util.Properties)创建。对于某些带有自定配置和序列化格式的常见用例,后者可能派得上用场。

The Debezium Java DSL can create a DebeziumMessageProducer instance from a provided DebeziumEngine.Builder, as well as from a plain Debezium configuration (e.g. java.util.Properties). Later can be handy for some common use-cases with opinionated configuration and serialization formats.

此外,DebeziumMessageProducer 可以使用以下配置属性进行调整:

Additionally, the DebeziumMessageProducer can be tuned with the following configuration properties:

  • contentType- 允许处理 JSON (默认)、AVROPROTOBUF 消息内容。contentType must 可以与为提供的 DebeziumEngine.Builder 所配置的 SerializationFormat 对齐。

  • contentType - allows handling for JSON (default), AVRO and PROTOBUF message contents. The contentType must be be aligned with the SerializationFormat configured for the provided DebeziumEngine.Builder.

  • enableBatch- 当设置为 false (默认) 时,Debezium 适配器将为从源数据库接收到的每个 ChangeEvent 数据变更事件发送新的 Message。如果设置为 true,则该适配器会向下游发送单个 Message,用于从 Debezium 引擎接收的每批 ChangeEvent。此类有效内容不可序列化,需要自定义序列化/反序列化实现。

  • enableBatch - when set to false (default), the debezium adapter would send new Message for every ChangeEvent data change event received from the source database. If set to true then the adapter sends downstream a single Message for each batch of ChangeEvent received from the Debezium engine. Such a payload is not serializable and would require a custom serialization/deserialization implementation.

  • enableEmptyPayload- 启用对墓碑(又名删除)消息的支持。在数据库行删除时,Debezium 可以发送一个墓碑变更事件,该事件具有与已删除行相同的键和 Optional.empty 的值。默认为 false

  • enableEmptyPayload - Enables support for tombstone (aka delete) messages. On a database row delete, Debezium can send a tombstone change event that has the same key as the deleted row and a value of Optional.empty. Defaults to false.

  • headerMapper- 自定义 HeaderMapper 实现,该实现允许选择并将 ChangeEvent 标头转换为 Message 标头。默认 DefaultDebeziumHeaderMapper 实现为 setHeaderNamesToMap 提供了一个 setter。默认情况下,会映射所有标头。

  • headerMapper - custom HeaderMapper implementation that allows for selecting and converting the ChangeEvent headers into Message headers. The default DefaultDebeziumHeaderMapper implementation provides a setter for setHeaderNamesToMap. By default, all headers are mapped.

  • taskExecutor- 为 Debezium 引擎设置自定义 TaskExecutor

  • taskExecutor - Set a custom TaskExecutor for the Debezium engine.

下面的代码片段演示此通道适配器的各种配置:

The following code snippets demonstrate various configuration for this channel adapter:

Configuring with Java Configuration

以下 Spring Boot 应用展示了如何通过 Java 配置来配置入站适配器的一个示例:

The following Spring Boot application shows an example of how to configure the inbound adapter with Java configuration:

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
                .web(WebApplicationType.NONE)
                .run(args);
    }

    @Bean
    public MessageChannel debeziumInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer debeziumMessageProducer(
            DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
            MessageChannel debeziumInputChannel) {

        DebeziumMessageProducer debeziumMessageProducer =
            new DebeziumMessageProducer(debeziumEngineBuilder);
        debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
        return debeziumMessageProducer;
    }

    @ServiceActivator(inputChannel = "debeziumInputChannel")
    public void handler(Message<?> message) {

        Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)

        String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)

        String payload = new String((byte[]) message.getPayload()); (3)

        System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
    }

}
1 该事件预期的逻辑目标的名称。目标通常由 topic.prefix 配置选项、数据库名称和表名称组成。例如:my-topic.inventory.orders
2 A name of the logical destination for which the event is intended. Usually the destination is composed of the topic.prefix configuration option, the database name and the table name. For example: my-topic.inventory.orders.
3 包含已更改表的键架构和已更改行的实际键。在连接器创建事件时,键架构及其相应的键负载都包含已更改表(或唯一约束)中每一列的字段。
4 Contains the schema for the changed table’s key and the changed row’s actual key. Both the key schema and its corresponding key payload contain a field for each column in the changed table’s PRIMARY KEY (or unique constraint) at the time the connector created the event.
5 与键类似,负载有一个架构部分和一个负载值部分。架构部分包含描述负载值部分的消息封送结构的架构,包括其嵌套字段。创建、更新或删除数据的操作的变更事件都有一个具有消息封送结构的值负载。
6 Like the key, the payload has a schema section and a payload value section. The schema section contains the schema that describes the Envelope structure of the payload value section, including its nested fields. Change events for operations that create, update or delete data all have a value payload with an envelope structure.

key.converter.schemas.enable=false 和/或 value.converter.schemas.enable=false 允许分别禁用键或有效负载的消息内模式内容。

The key.converter.schemas.enable=false and/or value.converter.schemas.enable=false permit disabling the in-message schema content for key or payload respectively.

类似地,我们可以配置 DebeziumMessageProducer 以批量处理传入的更改事件:

Similarly, we can configure the DebeziumMessageProducer to process the incoming change events in batches:

@Bean
public MessageProducer debeziumMessageProducer(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
        MessageChannel debeziumInputChannel) {

    DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
	debeziumMessageProducer.setEnableBatch(true);
    debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
    return debeziumMessageProducer;
}

@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
    System.out.println(payload);
}

Debezium Java DSL Support

spring-integration-debezium 通过 Debezium 工厂和 DebeziumMessageProducerSpec 实现提供方便的 Java DSL 流畅 API。

The spring-integration-debezium provides a convenient Java DSL fluent API via the Debezium factory and the DebeziumMessageProducerSpec implementations.

适用于 Debezium Java DSL 的入站通道适配器是:

The Inbound Channel Adapter for Debezium Java DSL is:

 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>   debeziumEngineBuilder = ...
 IntegrationFlow.from(
    Debezium.inboundChannelAdapter(debeziumEngineBuilder)
        .headerNames("special*")
        .contentType("application/json")
        .enableBatch(false))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

或者通过本机 debezium 配置属性创建 DebeziumMessageProducerSpec 实例,并默认为 JSON 序列化格式。

Or create an DebeziumMessageProducerSpec instance from native debezium configuration properties and default to JSON serialization formats.

 Properties debeziumConfig = ...
 IntegrationFlow
    .from(Debezium.inboundChannelAdapter(debeziumConfig))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

下面的 Spring Boot 应用程序提供使用 Java DSL 配置入站适配器的示例:

The following Spring Boot application provides an example of configuring the inbound adapter with the Java DSL:

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow debeziumInbound(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {

        return IntegrationFlow
                .from(Debezium
                        .inboundChannelAdapter(debeziumEngineBuilder)
					    .headerNames("special*")
					    .contentType("application/json")
					    .enableBatch(false))
                .handle(m -> System.out.println(new String((byte[]) m.getPayload())))
                .get();
    }

}