通道适配器

通道适配器是一种消息端点,它允许将单个发送者或接收者连接到消息通道。 Spring Integration 提供了许多适配器来支持各种传输,例如 JMS、文件、HTTP、Web 服务、邮件等。 本参考指南的后续章节将讨论每个适配器。 然而,本章重点介绍简单但灵活的方法调用通道适配器支持。 有入站和出站适配器,每个都可以使用核心命名空间中提供的 XML 元素进行配置。 只要您有一个可以作为源或目标的调用方法,这些适配器就提供了一种扩展 Spring Integration 的简便方法。

配置入站通道适配器

inbound-channel-adapter 元素(在 Java 配置中是 SourcePollingChannelAdapter)可以调用 Spring 管理对象上的任何方法,并在将方法的输出转换为 Message 后将非空返回值发送到 MessageChannel。 当适配器的订阅被激活时,轮询器会尝试从源接收消息。 轮询器根据提供的配置使用 TaskScheduler 进行调度。 要为单个通道适配器配置轮询间隔或 cron 表达式,您可以提供一个带有调度属性(例如“fixed-rate”或“cron”)的“poller”元素。 以下示例定义了两个 inbound-channel-adapter 实例:

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean
public IntegrationFlow source1() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.fixedRate(5000)))
                ...
                .get();
}

@Bean
public IntegrationFlow source2() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
                ...
                .get();
}
public class SourceService {

    @InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
    Object method1() {
        ...
    }

    @InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
    Object method2() {
        ...
    }
}
@Bean
fun messageSourceFlow() =
    integrationFlow( { GenericMessage<>(...) },
                    { poller { it.fixedRate(5000) } }) {
        ...
    }
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>

<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
    <int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>

如果没有提供轮询器,则必须在上下文中注册一个默认轮询器。 有关更多详细信息,请参阅 端点命名空间支持

轮询端点的默认触发器是一个 PeriodicTrigger 实例,具有 1 秒的固定延迟周期。

Example 1. 重要:轮询器配置

所有 inbound-channel-adapter 类型都由 SourcePollingChannelAdapter 支持,这意味着它们包含一个轮询器配置,该配置根据轮询器中指定的配置轮询 MessageSource(以调用生成成为 Message 有效载荷的值的自定义方法)。 以下示例显示了两个轮询器的配置:

<int:poller max-messages-per-poll="1" fixed-rate="1000"/>

<int:poller max-messages-per-poll="10" fixed-rate="1000"/>

在第一个配置中,轮询任务每次轮询调用一次,并且在每个任务(轮询)期间,根据 max-messages-per-poll 属性值,该方法(产生消息)被调用一次。 在第二个配置中,轮询任务每次轮询调用 10 次或直到它返回“null”,因此每次轮询可能产生 10 条消息,而每次轮询以一秒的间隔发生。 但是,如果配置看起来像以下示例会发生什么:

<int:poller fixed-rate="1000"/>

请注意,没有指定 max-messages-per-poll。 正如我们稍后将介绍的,PollingConsumer(例如,service-activatorfilterrouter 等)中相同的轮询器配置的 max-messages-per-poll 默认值为 -1,这意味着“不停地执行轮询任务,除非轮询方法返回 null(可能因为 `QueueChannel 中没有更多消息)”,然后休眠一秒。 但是,在 `SourcePollingChannelAdapter 中,它有点不同。 max-messages-per-poll 的默认值为 1,除非您明确将其设置为负值(例如 -1)。 这确保了轮询器可以响应生命周期事件(例如启动和停止),并防止它在 MessageSource 的自定义方法的实现有可能永远不返回 null 并且恰好是不可中断的情况下,可能陷入无限循环。 但是,如果您确定您的方法可以返回 null,并且您需要为每次轮询轮询尽可能多的源,则应明确将 max-messages-per-poll 设置为负值,如以下示例所示:

<int:poller max-messages-per-poll="-1" fixed-rate="1000"/>

从 5.5 版本开始,max-messages-per-poll0 值具有特殊含义——完全跳过 MessageSource.receive() 调用,这可以被认为是暂停此入站通道适配器,直到 maxMessagesPerPoll 在稍后通过控制总线等方式更改为非零值。 从 6.2 版本开始,fixed-delayfixed-rate 可以配置为 ISO 8601 Duration 格式,例如 PT10SP1D 等。 此外,底层 PeriodicTriggerinitial-interval 也以与 fixed-delayfixed-rate 相似的值格式公开。 另请参阅 全局默认轮询器 了解更多信息。

配置出站通道适配器

outbound-channel-adapter 元素(Java 配置的 @ServiceActivator)也可以将 MessageChannel 连接到任何 POJO 消费者方法,该方法应使用发送到该通道的消息的有效负载进行调用。 以下示例显示了如何定义出站通道适配器:

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
    return f -> f
             .handle(myPojo, "handle");
}
public class MyPojo {

    @ServiceActivator(channel = "channel1")
    void handle(Object payload) {
        ...
    }

}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
    integrationFlow {
        handle(myPojo, "handle")
    }
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>

<beans:bean id="target" class="org.MyPojo"/>

如果被适配的通道是 PollableChannel,您必须提供一个轮询器子元素(@ServiceActivator 上的 @Poller 子注解),如以下示例所示:

  • Java

  • XML

public class MyPojo {

    @ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
    void handle(Object payload) {
        ...
    }

}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
    <int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>

<beans:bean id="target" class="org.MyPojo"/>

如果 POJO 消费者实现可以在其他 <outbound-channel-adapter> 定义中重用,则应使用 ref 属性。 但是,如果消费者实现仅由单个 <outbound-channel-adapter> 定义引用,则可以将其定义为内部 bean,如以下示例所示:

<int:outbound-channel-adapter channel="channel" method="handle">
    <beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>

在同一 <outbound-channel-adapter> 配置中同时使用 ref 属性和内部处理程序定义是不允许的,因为它会创建模糊条件。 此类配置会导致抛出异常。

任何通道适配器都可以在没有 channel 引用的情况下创建,在这种情况下,它会隐式创建一个 DirectChannel 实例。 创建的通道的名称与 <inbound-channel-adapter><outbound-channel-adapter> 元素的 id 属性匹配。 因此,如果未提供 channel,则 id 是必需的。

通道适配器表达式和脚本

与许多其他 Spring Integration 组件一样,<inbound-channel-adapter><outbound-channel-adapter> 也提供 SpEL 表达式评估支持。 要使用 SpEL,请在“expression”属性中提供表达式字符串,而不是提供用于 bean 上方法调用的“ref”和“method”属性。 当表达式被评估时,它遵循与方法调用相同的契约,其中:<inbound-channel-adapter> 的表达式在评估结果为非空值时生成消息,而 <outbound-channel-adapter> 的表达式必须等同于返回 void 的方法调用。

从 Spring Integration 3.0 开始,<int:inbound-channel-adapter/> 也可以配置一个 SpEL <expression/>(甚至是一个 <script/>)子元素,以应对比简单“expression”属性更复杂的需求。 如果通过使用 location 属性提供脚本作为 Resource,您还可以设置 refresh-check-delay,这允许资源定期刷新。 如果您希望在每次轮询时检查脚本,则需要将此设置与轮询器的触发器进行协调,如以下示例所示:

<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller max-messages-per-poll="1" fixed-delay="5000"/>
    <script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>

另请参阅使用 <expression/> 子元素时的 ReloadableResourceBundleExpressionSource 上的 cacheSeconds 属性。 有关表达式的更多信息,请参阅 Spring 表达式语言 (SpEL)。 有关脚本,请参阅 Groovy 支持脚本支持

<int:inbound-channel-adapter/> (SourcePollingChannelAdapter) 是一个端点,它通过定期触发轮询底层 MessageSource 来启动消息流。 由于在轮询时没有消息对象,表达式和脚本无法访问根 Message,因此在大多数其他消息 SpEL 表达式中没有可用的有效负载或头属性。 脚本可以生成并返回一个带有头和有效负载的完整 Message 对象,或者只返回一个有效负载,该有效负载由框架添加到带有基本头的消息中。