JDBC 消息存储
Spring Integration 提供了两个 JDBC 特定的消息存储实现。
JdbcMessageStore
适用于聚合器和凭证检查模式。
JdbcChannelMessageStore
实现专门为消息通道提供了一个更具针对性和可扩展性的实现。
请注意,您可以使用 JdbcMessageStore
来支持消息通道,而 JdbcChannelMessageStore
则针对此目的进行了优化。
从 5.0.11、5.1.2 版本开始,JdbcChannelMessageStore
的索引已得到优化。
如果您的存储中有大型消息组,您可能希望更改索引。
此外,PriorityChannel
的索引被注释掉了,因为它只有在使用 JDBC 支持的此类通道时才需要。
当使用 |
初始化数据库
在开始使用 JDBC 消息存储组件之前,您应该为目标数据库提供适当的对象。
Spring Integration 附带了一些可用于初始化数据库的示例脚本。
在 spring-integration-jdbc
JAR 文件中,您可以在 org.springframework.integration.jdbc
包中找到脚本。
它为一系列常见数据库平台提供了示例创建脚本和示例删除脚本。
使用这些脚本的常见方法是在 Spring JDBC 数据源初始化器 中引用它们。
请注意,这些脚本作为示例和所需表名和列名的规范提供。
您可能会发现需要为生产使用增强它们(例如,通过添加索引声明)。
从 6.2 版本开始,JdbcMessageStore
、JdbcChannelMessageStore
、JdbcMetadataStore
和 DefaultLockRepository
实现了 SmartLifecycle
并在其 start()
方法中对各自的表执行 SELECT COUNT
查询,以确保所需表(根据提供的表前缀)存在于目标数据库中。
如果所需表不存在,应用程序上下文将无法启动。
可以通过 setCheckDatabaseOnStart(false)
禁用此检查。
通用 JDBC 消息存储
JDBC 模块提供了 Spring Integration MessageStore
(在凭证检查模式中很重要)和 MessageGroupStore
(在有状态模式(如聚合器)中很重要)的数据库支持实现。
这两个接口都由 JdbcMessageStore
实现,并且支持在 XML 中配置存储实例,如以下示例所示:
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
您可以指定 JdbcTemplate
而不是 DataSource
。
以下示例显示了一些其他可选属性:
<int-jdbc:message-store id="messageStore" data-source="dataSource" table-prefix="MY_INT_"/>
在前面的示例中,我们为存储生成的查询中的表名指定了一个前缀。
表名前缀默认为 INT_
。
支持消息通道
如果您打算使用 JDBC 支持消息通道,我们建议使用 JdbcChannelMessageStore
实现。
它仅与消息通道结合使用。
支持的数据库
JdbcChannelMessageStore
使用特定于数据库的 SQL 查询从数据库中检索消息。
因此,您必须在 JdbcChannelMessageStore
上设置 ChannelMessageStoreQueryProvider
属性。
此 channelMessageStoreQueryProvider
为您指定的特定数据库提供 SQL 查询。
Spring Integration 支持以下关系型数据库:
-
PostgreSQL
-
HSQLDB
-
MySQL
-
Oracle
-
Derby
-
H2
-
SqlServer
-
Sybase
-
DB2
如果您的数据库未列出,您可以实现 ChannelMessageStoreQueryProvider
接口并提供您自己的自定义查询。
4.0 版本向表中添加了 MESSAGE_SEQUENCE
列,以确保即使消息在同一毫秒内存储,也能实现先进先出 (FIFO) 排队。
从 6.2 版本开始,ChannelMessageStoreQueryProvider
暴露了一个 isSingleStatementForPoll
标志,其中 PostgresChannelMessageStoreQueryProvider
返回 true
,其轮询查询现在基于单个 DELETE…RETURNING
语句。
JdbcChannelMessageStore
会查阅 isSingleStatementForPoll
选项,如果只支持单个轮询语句,则跳过单独的 DELETE
语句。
自定义消息插入
从 5.0 版本开始,通过重载 ChannelMessageStorePreparedStatementSetter
类,您可以为 JdbcChannelMessageStore
中的消息插入提供自定义实现。
您可以使用它来设置不同的列或更改表结构或序列化策略。
例如,您可以将其结构存储为 JSON 字符串,而不是默认的 byte[]
序列化。
以下示例使用 setValues
的默认实现来存储公共列,并覆盖该行为以将消息负载存储为 varchar
:
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
通常,我们不建议使用关系型数据库进行排队。 相反,如果可能,请考虑使用 JMS 或 AMQP 支持的通道。 欲了解更多信息,请参阅以下资源:
如果您仍然计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,这将在 后续部分 中描述。
并发轮询
轮询消息通道时,您可以选择为关联的 Poller
配置 TaskExecutor
引用。
请记住,如果您使用 JDBC 支持的消息通道,并且计划使用多个线程轮询通道并因此以事务方式轮询消息存储,则应确保使用支持 多版本并发控制 (MVCC) 的关系型数据库。
否则,锁定可能会成为问题,并且在使用多个线程时,性能可能无法如预期般实现。
例如,Apache Derby 在这方面存在问题。
为了实现更好的 JDBC 队列吞吐量并避免不同线程可能从队列中轮询相同 Message
时出现问题,在使用不支持 MVCC 的数据库时,将 JdbcChannelMessageStore
的 usingIdCache
属性设置为 true
是*重要*的。
以下示例展示了如何实现:
<bean id="queryProvider"
class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
<int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>
<task:executor id="pool" pool-size="10"
queue-capacity="10" rejection-policy="CALLER_RUNS" />
<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
<property name="region" value="TX_TIMEOUT"/>
<property name="usingIdCache" value="true"/>
</bean>
<int:channel id="inputChannel">
<int:queue message-store="store"/>
</int:channel>
<int:bridge input-channel="inputChannel" output-channel="outputChannel">
<int:poller fixed-delay="500" receive-timeout="500"
max-messages-per-poll="1" task-executor="pool">
<int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
isolation="READ_COMMITTED" transaction-manager="transactionManager" />
</int:poller>
</int:bridge>
<int:channel id="outputChannel" />
优先级通道
从 4.0 版本开始,JdbcChannelMessageStore
实现了 PriorityCapableChannelMessageStore
并提供了 priorityEnabled
选项,使其可用作 priority-queue
实例的 message-store
引用。
为此,INT_CHANNEL_MESSAGE
表有一个 MESSAGE_PRIORITY
列来存储 PRIORITY
消息头的 H.
此外,一个新的 MESSAGE_SEQUENCE
列允许我们实现一个健壮的先进先出 (FIFO) 轮询机制,即使多个消息以相同的优先级在同一毫秒内存储。
消息从数据库中轮询(选择)时,使用 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
。
我们不建议将相同的 |
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
分区消息存储
通常,JdbcMessageStore
用作一组应用程序或同一应用程序中节点的全局存储。
为了提供一些针对名称冲突的保护并控制数据库元数据配置,消息存储允许以两种方式对表进行分区。
一种方法是使用单独的表名,通过更改前缀(如 前面所述)。
另一种方法是为 region
名称指定一个分区数据在单个表中的名称。
第二种方法的一个重要用例是当 MessageStore
管理支持 Spring Integration 消息通道的持久化队列时。
持久化通道的消息数据在存储中以通道名称为键。
因此,如果通道名称不是全局唯一的,则通道可能会获取不属于它们的数据。
为了避免这种危险,您可以使用消息存储 region
来保持具有相同逻辑名称的不同物理通道的数据分离。
PostgreSQL:接收推送通知
PostgreSQL 提供了监听和通知框架,用于在数据库表操作时接收推送通知。
Spring Integration 利用此机制(从 6.0 版本开始)允许在 JdbcChannelMessageStore
中添加新消息时接收推送通知。
使用此功能时,必须定义数据库触发器,该触发器可以在 Spring Integration 的 JDBC 模块中包含的 schema-postgresql.sql
文件的注释中找到。
推送通知通过 PostgresChannelMessageTableSubscriber
类接收,该类允许其订阅者在任何给定 region
和 groupId
的新消息到达时接收回调。
即使消息是在不同的 JVM 上但添加到同一个数据库中,也会收到这些通知。
PostgresSubscribableChannel
实现使用 PostgresChannelMessageTableSubscriber.Subscription
契约,作为对上述 PostgresChannelMessageTableSubscriber
通知做出反应,从存储中拉取消息。
例如,可以按如下方式接收 some group
的推送通知:
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
事务支持
从 6.0.5 版本开始,在 PostgresSubscribableChannel
上指定 PlatformTransactionManager
将在事务中通知订阅者。
订阅者中的异常将导致事务回滚并将消息放回消息存储。
事务支持默认不激活。
重试
从 6.0.5 版本开始,可以通过向 PostgresSubscribableChannel
提供 RetryTemplate
来指定重试策略。
默认情况下,不执行重试。
任何活动的 PostgresChannelMessageTableSubscriber
在其活动生命周期内都占用一个独占的 JDBC Connection
。
因此,重要的是此连接不是来自连接池 DataSource
。
此类连接池通常期望在预定义超时窗口内关闭已发出的连接。
由于需要独占连接,还建议 JVM 仅运行单个 PostgresChannelMessageTableSubscriber
,该订阅者可用于注册任意数量的订阅。