入站通道适配器

入站通道适配器的主要功能是执行 SQL SELECT 查询并将结果集转换为消息。消息负载是整个结果集(表示为 List),列表中各项的类型取决于行映射策略。默认策略是一个通用映射器,为查询结果中的每一行返回一个 Map。您可以通过添加对 RowMapper 实例的引用来选择性地更改此策略(有关行映射的更详细信息,请参阅 Spring JDBC 文档)。

如果您想将 SELECT 查询结果中的行转换为单独的消息,可以使用下游的分裂器。

入站适配器还需要引用 JdbcTemplate 实例或 DataSource。除了生成消息的 SELECT 语句外,适配器还有一个 UPDATE 语句,用于将记录标记为已处理,以便它们不会在下次轮询中出现。更新可以通过原始选择中的 ID 列表进行参数化。默认情况下,这是通过命名约定完成的(输入结果集中名为 id 的列被转换为更新的参数映射中名为 id 的列表)。以下示例定义了一个带有更新查询和 DataSource 引用的入站通道适配器。

<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
    channel="target" data-source="dataSource"
    update="update item set status=10 where id in (:id)" />

更新查询中的参数以冒号(:)作为参数名称的前缀(在前面的示例中,这是一个应用于轮询结果集中每一行的表达式)。这是 Spring JDBC 中命名参数 JDBC 支持的标准特性,结合 Spring Integration 中采用的约定(投影到轮询结果列表)。底层的 Spring JDBC 特性限制了可用的表达式(例如,除了句点之外的大多数特殊字符都不允许),但由于目标通常是可通过 Bean 路径寻址的对象列表(可能是单个对象的列表),因此这并不过于严格。

要更改参数生成策略,您可以将 SqlParameterSourceFactory 注入适配器以覆盖默认行为(适配器有一个 sql-parameter-source-factory 属性)。Spring Integration 提供了 ExpressionEvaluatingSqlParameterSourceFactory,它创建一个基于 SpEL 的参数源,查询结果作为 #root 对象。(如果 update-per-row 为 true,则根对象是行)。如果相同的参数名在更新查询中多次出现,它只被评估一次,并且其结果会被缓存。您还可以为 select 查询使用参数源。在这种情况下,由于没有可供评估的“result”对象,每次都使用单个参数源(而不是使用参数源工厂)。从 4.0 版本开始,您可以使用 Spring 创建基于 SpEL 的参数源,如以下示例所示:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
	channel="target" data-source="dataSource"
	select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
			factory-method="createParameterSourceNoCache">
	<constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
		class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
	<property name="parameterExpressions">
		<map>
			<entry key="status" value="@statusBean.which()" />
		</map>
	</property>
</bean>

<bean id="statusBean" class="foo.StatusDetermination" />

每个参数表达式中的 value 可以是任何有效的 SpEL 表达式。表达式评估的 #root 对象是 parameterSource Bean 上定义的构造函数参数。(在前面的示例中,是一个空的 String)。从 5.0 版本开始,您可以为 ExpressionEvaluatingSqlParameterSourceFactory 提供 sqlParameterTypes 以指定特定参数的目标 SQL 类型。以下示例为查询中使用的参数提供了 SQL 类型:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
    channel="target" data-source="dataSource"
    select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
            factory-method="createParameterSourceNoCache">
    <constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
        class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="sqlParameterTypes">
        <map>
            <entry key="status" value="#{ T(java.sql.Types).BINARY}" />
        </map>
    </property>
</bean>

使用 createParameterSourceNoCache 工厂方法。否则,参数源会缓存评估结果。另请注意,由于缓存被禁用,如果相同的参数名在 select 查询中多次出现,它会在每次出现时重新评估。

轮询和事务

入站适配器接受常规的 Spring Integration 轮询器作为子元素。因此,可以控制轮询的频率(以及其他用途)。对于 JDBC 用法,轮询器的一个重要特性是将轮询操作包装在事务中的选项,如以下示例所示:

<int-jdbc:inbound-channel-adapter query="..."
        channel="target" data-source="dataSource" update="...">
    <int:poller fixed-rate="1000">
        <int:transactional/>
    </int:poller>
</int-jdbc:inbound-channel-adapter>

如果您没有显式指定轮询器,则使用默认值。(与 Spring Integration 通常一样,它可以定义为顶级 Bean)。

在前面的示例中,数据库每 1000 毫秒(或每秒一次)轮询一次,并且更新和选择查询都在同一个事务中执行。事务管理器配置未显示。但是,只要它知道数据源,轮询就是事务性的。一个常见的用例是下游通道是直接通道(默认),以便在同一个线程中调用端点,从而在同一个事务中。这样,如果其中任何一个失败,事务就会回滚,并且输入数据会恢复到其原始状态。

max-rowsmax-messages-per-poll

JDBC 入站通道适配器定义了一个名为 max-rows 的属性。当您指定适配器的轮询器时,您还可以定义一个名为 max-messages-per-poll 的属性。虽然这两个属性看起来相似,但它们的含义却大相径庭。

max-messages-per-poll 指定每个轮询间隔执行查询的次数,而 max-rows 指定每次执行返回的行数。

在正常情况下,您可能不想在使用 JDBC 入站通道适配器时设置轮询器的 max-messages-per-poll 属性。它的默认值为 1,这意味着 JDBC 入站通道适配器的 receive() 方法在每个轮询间隔只执行一次。

max-messages-per-poll 属性设置为更大的值意味着查询会连续执行多次。有关 max-messages-per-poll 属性的更多信息,请参阅 配置入站通道适配器

相反,max-rows 属性,如果大于 0,则指定从 receive() 方法创建的查询结果集中使用的最大行数。如果属性设置为 0,则所有行都包含在结果消息中。该属性默认为 0

建议通过特定于供应商的查询选项来限制结果集,例如 MySQL 的 LIMIT 或 SQL Server 的 TOP 或 Oracle 的 ROWNUM。有关更多信息,请参阅特定供应商的文档。