JDBC 消息存储

Spring Integration 提供了两个 JDBC 特定的消息存储实现。 JdbcMessageStore 适用于聚合器和凭证检查模式。 JdbcChannelMessageStore 实现专门为消息通道提供了一个更具针对性和可扩展性的实现。 请注意,您可以使用 JdbcMessageStore 来支持消息通道,而 JdbcChannelMessageStore 则针对此目的进行了优化。

从 5.0.11、5.1.2 版本开始,JdbcChannelMessageStore 的索引已得到优化。 如果您的存储中有大型消息组,您可能希望更改索引。 此外,PriorityChannel 的索引被注释掉了,因为它只有在使用 JDBC 支持的此类通道时才需要。

当使用 OracleChannelMessageStoreQueryProvider 时,必须 添加优先级通道索引,因为它包含在查询的提示中。

初始化数据库

在开始使用 JDBC 消息存储组件之前,您应该为目标数据库提供适当的对象。

Spring Integration 附带了一些可用于初始化数据库的示例脚本。 在 spring-integration-jdbc JAR 文件中,您可以在 org.springframework.integration.jdbc 包中找到脚本。 它为一系列常见数据库平台提供了示例创建脚本和示例删除脚本。 使用这些脚本的常见方法是在 Spring JDBC 数据源初始化器 中引用它们。 请注意,这些脚本作为示例和所需表名和列名的规范提供。 您可能会发现需要为生产使用增强它们(例如,通过添加索引声明)。

从 6.2 版本开始,JdbcMessageStoreJdbcChannelMessageStoreJdbcMetadataStoreDefaultLockRepository 实现了 SmartLifecycle 并在其 start() 方法中对各自的表执行 SELECT COUNT 查询,以确保所需表(根据提供的表前缀)存在于目标数据库中。 如果所需表不存在,应用程序上下文将无法启动。 可以通过 setCheckDatabaseOnStart(false) 禁用此检查。

通用 JDBC 消息存储

JDBC 模块提供了 Spring Integration MessageStore(在凭证检查模式中很重要)和 MessageGroupStore(在有状态模式(如聚合器)中很重要)的数据库支持实现。 这两个接口都由 JdbcMessageStore 实现,并且支持在 XML 中配置存储实例,如以下示例所示:

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

您可以指定 JdbcTemplate 而不是 DataSource

以下示例显示了一些其他可选属性:

<int-jdbc:message-store id="messageStore" data-source="dataSource" table-prefix="MY_INT_"/>

在前面的示例中,我们为存储生成的查询中的表名指定了一个前缀。 表名前缀默认为 INT_

支持消息通道

如果您打算使用 JDBC 支持消息通道,我们建议使用 JdbcChannelMessageStore 实现。 它仅与消息通道结合使用。

支持的数据库

JdbcChannelMessageStore 使用特定于数据库的 SQL 查询从数据库中检索消息。 因此,您必须在 JdbcChannelMessageStore 上设置 ChannelMessageStoreQueryProvider 属性。 此 channelMessageStoreQueryProvider 为您指定的特定数据库提供 SQL 查询。 Spring Integration 支持以下关系型数据库:

  • PostgreSQL

  • HSQLDB

  • MySQL

  • Oracle

  • Derby

  • H2

  • SqlServer

  • Sybase

  • DB2

如果您的数据库未列出,您可以实现 ChannelMessageStoreQueryProvider 接口并提供您自己的自定义查询。

4.0 版本向表中添加了 MESSAGE_SEQUENCE 列,以确保即使消息在同一毫秒内存储,也能实现先进先出 (FIFO) 排队。

从 6.2 版本开始,ChannelMessageStoreQueryProvider 暴露了一个 isSingleStatementForPoll 标志,其中 PostgresChannelMessageStoreQueryProvider 返回 true,其轮询查询现在基于单个 DELETE…​RETURNING 语句。 JdbcChannelMessageStore 会查阅 isSingleStatementForPoll 选项,如果只支持单个轮询语句,则跳过单独的 DELETE 语句。

自定义消息插入

从 5.0 版本开始,通过重载 ChannelMessageStorePreparedStatementSetter 类,您可以为 JdbcChannelMessageStore 中的消息插入提供自定义实现。 您可以使用它来设置不同的列或更改表结构或序列化策略。 例如,您可以将其结构存储为 JSON 字符串,而不是默认的 byte[] 序列化。

以下示例使用 setValues 的默认实现来存储公共列,并覆盖该行为以将消息负载存储为 varchar

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}

通常,我们不建议使用关系型数据库进行排队。 相反,如果可能,请考虑使用 JMS 或 AMQP 支持的通道。 欲了解更多信息,请参阅以下资源:

如果您仍然计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,这将在 后续部分 中描述。

并发轮询

轮询消息通道时,您可以选择为关联的 Poller 配置 TaskExecutor 引用。

请记住,如果您使用 JDBC 支持的消息通道,并且计划使用多个线程轮询通道并因此以事务方式轮询消息存储,则应确保使用支持 多版本并发控制 (MVCC) 的关系型数据库。 否则,锁定可能会成为问题,并且在使用多个线程时,性能可能无法如预期般实现。 例如,Apache Derby 在这方面存在问题。 为了实现更好的 JDBC 队列吞吐量并避免不同线程可能从队列中轮询相同 Message 时出现问题,在使用不支持 MVCC 的数据库时,将 JdbcChannelMessageStoreusingIdCache 属性设置为 true 是*重要*的。 以下示例展示了如何实现:

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />

优先级通道

从 4.0 版本开始,JdbcChannelMessageStore 实现了 PriorityCapableChannelMessageStore 并提供了 priorityEnabled 选项,使其可用作 priority-queue 实例的 message-store 引用。 为此,INT_CHANNEL_MESSAGE 表有一个 MESSAGE_PRIORITY 列来存储 PRIORITY 消息头的 H. 此外,一个新的 MESSAGE_SEQUENCE 列允许我们实现一个健壮的先进先出 (FIFO) 轮询机制,即使多个消息以相同的优先级在同一毫秒内存储。 消息从数据库中轮询(选择)时,使用 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE

我们不建议将相同的 JdbcChannelMessageStore bean 用于优先级和非优先级队列通道,因为 priorityEnabled 选项适用于整个存储,并且队列通道无法保留正确的 FIFO 队列语义。 但是,相同的 INT_CHANNEL_MESSAGE 表(甚至 region)可以用于两种 JdbcChannelMessageStore 类型。 要配置该场景,您可以从另一个消息存储 bean 扩展一个消息存储 bean,如以下示例所示:

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

分区消息存储

通常,JdbcMessageStore 用作一组应用程序或同一应用程序中节点的全局存储。 为了提供一些针对名称冲突的保护并控制数据库元数据配置,消息存储允许以两种方式对表进行分区。 一种方法是使用单独的表名,通过更改前缀(如 前面所述)。 另一种方法是为 region 名称指定一个分区数据在单个表中的名称。 第二种方法的一个重要用例是当 MessageStore 管理支持 Spring Integration 消息通道的持久化队列时。 持久化通道的消息数据在存储中以通道名称为键。 因此,如果通道名称不是全局唯一的,则通道可能会获取不属于它们的数据。 为了避免这种危险,您可以使用消息存储 region 来保持具有相同逻辑名称的不同物理通道的数据分离。

PostgreSQL:接收推送通知

PostgreSQL 提供了监听和通知框架,用于在数据库表操作时接收推送通知。 Spring Integration 利用此机制(从 6.0 版本开始)允许在 JdbcChannelMessageStore 中添加新消息时接收推送通知。 使用此功能时,必须定义数据库触发器,该触发器可以在 Spring Integration 的 JDBC 模块中包含的 schema-postgresql.sql 文件的注释中找到。

推送通知通过 PostgresChannelMessageTableSubscriber 类接收,该类允许其订阅者在任何给定 regiongroupId 的新消息到达时接收回调。 即使消息是在不同的 JVM 上但添加到同一个数据库中,也会收到这些通知。 PostgresSubscribableChannel 实现使用 PostgresChannelMessageTableSubscriber.Subscription 契约,作为对上述 PostgresChannelMessageTableSubscriber 通知做出反应,从存储中拉取消息。

例如,可以按如下方式接收 some group 的推送通知:

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
    messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
      @Value("${spring.datasource.url}") String url,
      @Value("${spring.datasource.username}") String username,
      @Value("${spring.datasource.password}") String password) {
    return new PostgresChannelMessageTableSubscriber(() ->
        DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore) {
  return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}

事务支持

从 6.0.5 版本开始,在 PostgresSubscribableChannel 上指定 PlatformTransactionManager 将在事务中通知订阅者。 订阅者中的异常将导致事务回滚并将消息放回消息存储。 事务支持默认不激活。

重试

从 6.0.5 版本开始,可以通过向 PostgresSubscribableChannel 提供 RetryTemplate 来指定重试策略。 默认情况下,不执行重试。

任何活动的 PostgresChannelMessageTableSubscriber 在其活动生命周期内都占用一个独占的 JDBC Connection。 因此,重要的是此连接不是来自连接池 DataSource。 此类连接池通常期望在预定义超时窗口内关闭已发出的连接。 由于需要独占连接,还建议 JVM 仅运行单个 PostgresChannelMessageTableSubscriber,该订阅者可用于注册任意数量的订阅。