Debezium 支持

Debezium Engine,变更数据捕获 (CDC) 入站通道适配器。 DebeziumMessageProducer 允许捕获数据库变更事件,将其转换为消息,然后将其流式传输到出站通道。 您需要将 spring integration Debezium 依赖项包含到您的项目中:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-debezium</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:{project-version}"

您还需要为您的输入数据库包含一个 debezium 连接器 依赖项。 例如,要将 Debezium 与 PostgreSQL 一起使用,您将需要 postgres debezium 连接器:

  • Maven

  • Gradle

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-postgres</artifactId>
    <version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"

debezium-version 替换为与正在使用的 spring-integration-debezium 版本兼容的版本。

入站 Debezium 通道适配器

Debezium 适配器需要一个预配置的 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> 实例。

debezium-supplier 提供了一个开箱即用的 DebeziumEngine.Builder Spring Boot 自动配置,以及一个方便的 DebeziumProperties 配置抽象。

debezium-java-dsl 可以从提供的 DebeziumEngine.Builder 以及普通的 Debezium 配置(例如 java.util.Properties)创建 DebeziumMessageProducer 实例。 后者对于某些具有特定配置和序列化格式的常见用例可能很方便。

此外,DebeziumMessageProducer 可以通过以下配置属性进行调整:

  • contentType - 允许处理 JSON(默认)、AVROPROTOBUF 消息内容。 contentType 必须与为提供的 DebeziumEngine.Builder 配置的 SerializationFormat 对齐。

  • enableBatch - 当设置为 false(默认)时,debezium 适配器会为从源数据库接收到的每个 ChangeEvent 数据变更事件发送新的 Message。 如果设置为 true,则适配器会为从 Debezium 引擎接收到的每个 ChangeEvent 批次发送一个 Message。 此类有效负载不可序列化,需要自定义序列化/反序列化实现。

  • enableEmptyPayload - 启用对墓碑(又称删除)消息的支持。 在数据库行删除时,Debezium 可以发送一个墓碑变更事件,该事件具有与已删除行相同的键和 Optional.empty 值。 默认为 false

  • headerMapper - 自定义 HeaderMapper 实现,允许选择和转换 ChangeEvent 标头为 Message 标头。 默认的 DefaultDebeziumHeaderMapper 实现提供了 setHeaderNamesToMap 的 setter。 默认情况下,所有标头都将被映射。

  • taskExecutor - 为 Debezium 引擎设置自定义 TaskExecutor

以下代码片段演示了此通道适配器的各种配置:

使用 Java 配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
                .web(WebApplicationType.NONE)
                .run(args);
    }

    @Bean
    public MessageChannel debeziumInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer debeziumMessageProducer(
            DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
            MessageChannel debeziumInputChannel) {

        DebeziumMessageProducer debeziumMessageProducer =
            new DebeziumMessageProducer(debeziumEngineBuilder);
        debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
        return debeziumMessageProducer;
    }

    @ServiceActivator(inputChannel = "debeziumInputChannel")
    public void handler(Message<?> message) {

        Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); [id="CO1-1"][id="CO1-1"](1)

        String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); [id="CO1-2"][id="CO1-2"](2)

        String payload = new String((byte[]) message.getPayload()); [id="CO1-3"][id="CO1-3"](3)

        System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
    }

}
 <1> 事件的目标逻辑名称。
通常,目标由 `topic.prefix` 配置选项、数据库名称和表名称组成。例如:`my-topic.inventory.orders`。
 <2> 包含更改表的键的模式和更改行的实际键。
键模式及其对应的键有效负载都包含更改表在连接器创建事件时 `PRIMARY KEY`(或唯一约束)中的每个字段。
 <3> 与键类似,有效负载具有模式部分和有效负载值部分。
模式部分包含描述有效负载值部分的 Envelope 结构的模式,包括其嵌套字段。
创建、更新或删除数据的操作的变更事件都具有带有信封结构的有效负载值。

key.converter.schemas.enable=false 和/或 value.converter.schemas.enable=false 允许分别禁用键或有效负载中的消息内模式内容。

同样,我们可以配置 DebeziumMessageProducer 以批处理方式处理传入的变更事件:

@Bean
public MessageProducer debeziumMessageProducer(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
        MessageChannel debeziumInputChannel) {

    DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
	debeziumMessageProducer.setEnableBatch(true);
    debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
    return debeziumMessageProducer;
}

@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
    System.out.println(payload);
}

Debezium Java DSL 支持

spring-integration-debezium 通过 Debezium 工厂和 DebeziumMessageProducerSpec 实现提供了方便的 Java DSL 流式 API。

Debezium Java DSL 的入站通道适配器是:

 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>   debeziumEngineBuilder = ...
 IntegrationFlow.from(
    Debezium.inboundChannelAdapter(debeziumEngineBuilder)
        .headerNames("special*")
        .contentType("application/json")
        .enableBatch(false))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

或者从原生 debezium 配置属性创建 DebeziumMessageProducerSpec 实例,并默认为 JSON 序列化格式。

 Properties debeziumConfig = ...
 IntegrationFlow
    .from(Debezium.inboundChannelAdapter(debeziumConfig))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

以下 Spring Boot 应用程序提供了使用 Java DSL 配置入站适配器的示例:

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow debeziumInbound(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {

        return IntegrationFlow
                .from(Debezium
                        .inboundChannelAdapter(debeziumEngineBuilder)
					    .headerNames("special*")
					    .contentType("application/json")
					    .enableBatch(false))
                .handle(m -> System.out.println(new String((byte[]) m.getPayload())))
                .get();
    }

}