路由器实现

由于基于内容的路由通常需要一些领域特定的逻辑,因此大多数用例都需要 Spring Integration 通过使用 XML 命名空间支持或注解来委托给 POJO 的选项。 这两者将在后面讨论。 但是,我们首先介绍几个满足常见需求的实现。

PayloadTypeRouter

PayloadTypeRouter 根据有效负载类型映射将消息发送到定义的通道,如以下示例所示:

<bean id="payloadTypeRouter"
      class="org.springframework.integration.router.PayloadTypeRouter">
    <property name="channelMapping">
        <map>
            <entry key="java.lang.String" value-ref="stringChannel"/>
            <entry key="java.lang.Integer" value-ref="integerChannel"/>
        </map>
    </property>
</bean>

Spring Integration 提供的命名空间也支持 PayloadTypeRouter 的配置(参见 命名空间支持),它通过将 <router/> 配置及其相应的实现(通过使用 <bean/> 元素定义)组合成一个更简洁的配置元素来简化配置。 以下示例显示了一个 PayloadTypeRouter 配置,它与上面的配置等效,但使用了命名空间支持:

<int:payload-type-router input-channel="routingChannel">
    <int:mapping type="java.lang.String" channel="stringChannel" />
    <int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>

以下示例显示了在 Java 中配置的等效路由器:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

使用 Java DSL 时,有两种选择。

首先,您可以定义路由器对象,如前面的示例所示:

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlow.from("routingChannel")
            .route(router())
            .get();
}

public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

请注意,路由器可以是 @Bean,但不是必须是。 如果它不是 @Bean,流会注册它。

其次,您可以在 DSL 流本身中定义路由函数,如以下示例所示:

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlow.from("routingChannel")
            .<Object, Class<?>>route(Object::getClass, m -> m
                    .channelMapping(String.class, "stringChannel")
                    .channelMapping(Integer.class, "integerChannel"))
            .get();
}

HeaderValueRouter

HeaderValueRouter 根据单个头部值映射将消息发送到通道。 创建 HeaderValueRouter 时,它会用要评估的头部名称进行初始化。 头部的值可以是以下两种情况之一:

  • 任意值

  • 通道名称

如果它是任意值,则需要将这些头部值映射到通道名称的额外映射。 否则,不需要额外的配置。

Spring Integration 提供了一个简单的基于命名空间的 XML 配置来配置 HeaderValueRouter。 以下示例演示了当需要将头部值映射到通道时 HeaderValueRouter 的配置:

<int:header-value-router input-channel="routingChannel" header-name="testHeader">
    <int:mapping value="someHeaderValue" channel="channelA" />
    <int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>

在解析过程中,前面示例中定义的路由器可能会遇到通道解析失败,从而导致异常。 如果您想抑制此类异常并将未解析的消息发送到默认输出通道(由 default-output-channel 属性标识),请将 resolution-required 设置为 false

通常,头部值未明确映射到通道的消息会发送到 default-output-channel。 但是,当头部值映射到通道名称但通道无法解析时,将 resolution-required 属性设置为 false 会导致将此类消息路由到 default-output-channel

以下示例显示了在 Java 中配置的等效路由器:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

使用 Java DSL 时,有两种选择。 首先,您可以定义路由器对象,如前面的示例所示:

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlow.from("routingChannel")
            .route(router())
            .get();
}

public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

请注意,路由器可以是 @Bean,但不是必须是。 如果它不是 @Bean,流会注册它。

其次,您可以在 DSL 流本身中定义路由函数,如以下示例所示:

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlow.from("routingChannel")
            .route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
                    m -> m
                        .channelMapping("someHeaderValue", "channelA")
                        .channelMapping("someOtherHeaderValue", "channelB"),
                e -> e.id("headerValueRouter"))
            .get();
}

当不需要将头部值映射到通道名称时进行配置,因为头部值本身表示通道名称。 以下示例显示了一个不需要将头部值映射到通道名称的路由器:

<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>

自 Spring Integration 2.1 以来,通道解析的行为更加明确。 例如,如果您省略 default-output-channel 属性,路由器无法解析至少一个有效通道,并且通过将 resolution-required 设置为 false 忽略了任何通道名称解析失败,那么将抛出 MessageDeliveryException。 基本上,默认情况下,路由器必须能够成功地将消息路由到至少一个通道。 如果您确实想丢弃消息,则还必须将 default-output-channel 设置为 nullChannel

RecipientListRouter

RecipientListRouter 将每个接收到的消息发送到静态定义的 메시지 通道列表。 以下示例创建了一个 RecipientListRouter

<bean id="recipientListRouter"
      class="org.springframework.integration.router.RecipientListRouter">
    <property name="channels">
        <list>
            <ref bean="channel1"/>
            <ref bean="channel2"/>
            <ref bean="channel3"/>
        </list>
    </property>
</bean>

Spring Integration 还支持 RecipientListRouter 配置的命名空间(参见 命名空间支持),如以下示例所示:

<int:recipient-list-router id="customRouter" input-channel="routingChannel"
        timeout="1234"
        ignore-send-failures="true"
        apply-sequence="true">
  <int:recipient channel="channel1"/>
  <int:recipient channel="channel2"/>
</int:recipient-list-router>

以下示例显示了在 Java 中配置的等效路由器:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
    RecipientListRouter router = new RecipientListRouter();
    router.setSendTimeout(1_234L);
    router.setIgnoreSendFailures(true);
    router.setApplySequence(true);
    router.addRecipient("channel1");
    router.addRecipient("channel2");
    router.addRecipient("channel3");
    return router;
}

以下示例显示了使用 Java DSL 配置的等效路由器:

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlow.from("routingChannel")
            .routeToRecipients(r -> r
                    .applySequence(true)
                    .ignoreSendFailures(true)
                    .recipient("channel1")
                    .recipient("channel2")
                    .recipient("channel3")
                    .sendTimeout(1_234L))
            .get();
}

这里的 'apply-sequence' 标志与 publish-subscribe-channel 的效果相同,并且,与 publish-subscribe-channel 一样,它在 recipient-list-router 上默认禁用。 有关更多信息,请参阅 PublishSubscribeChannel 配置

配置 RecipientListRouter 时另一个方便的选项是使用 Spring Expression Language (SpEL) 支持作为单个收件人通道的选择器。 这样做类似于在“链”的开头使用过滤器作为“选择性消费者”。 但是,在这种情况下,它都非常简洁地组合到路由器的配置中,如以下示例所示:

<int:recipient-list-router id="customRouter" input-channel="routingChannel">
    <int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
    <int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>

在前面的配置中,由 selector-expression 属性标识的 SpEL 表达式被评估,以确定此收件人是否应包含在给定输入消息的收件人列表中。 表达式的评估结果必须是 boolean。 如果未定义此属性,则该通道始终在收件人列表中。

RecipientListRouterManagement

从 4.1 版开始,RecipientListRouter 提供了多种操作,可在运行时动态操作收件人。 这些管理操作通过 @ManagedResource 注解由 RecipientListRouterManagement 提供。 它们可以通过使用 控制总线 以及使用 JMX 获得,如以下示例所示:

<control-bus input-channel="controlBus"/>

<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
   <recipient channel="channel1"/>
</recipient-list-router>

<channel id="channel2"/>
Message<?> addRecipientCommandMessage =
                     MessageBuilder.withPayload("'simpleRouter.handler'.addRecipient")
                            .setHeader(IntegrationMessageHeaderAccessor.CONTROL_BUS_ARGUMENTS, List.of("channel2"))
                            .build();

从应用程序启动开始,simpleRouter 只有一个 channel1 收件人。 但在 addRecipient 命令之后,添加了 channel2 收件人。 这是一个“注册对消息中某个部分的兴趣”的用例,我们可能在某个时间段内对来自路由器的消息感兴趣,因此我们订阅了 recipient-list-router,并在某个时候决定取消订阅。

由于 <recipient-list-router> 的运行时管理操作,它可以从一开始就配置而无需任何 <recipient>。 在这种情况下,当消息没有匹配的收件人时,RecipientListRouter 的行为是相同的。 如果配置了 defaultOutputChannel,则消息会发送到那里。 否则,将抛出 MessageDeliveryException

XPath 路由器

XPath 路由器是 XML 模块的一部分。 请参阅 使用 XPath 路由 XML 消息

路由和错误处理

Spring Integration 还提供了一种特殊的基于类型的路由器,称为 ErrorMessageExceptionTypeRouter,用于路由错误消息(定义为 payloadThrowable 实例的消息)。 ErrorMessageExceptionTypeRouter 类似于 PayloadTypeRouter。 事实上,它们几乎相同。 唯一的区别是,PayloadTypeRouter 遍历有效负载实例的实例层次结构(例如,payload.getClass().getSuperclass())以查找最具体的类型和通道映射,而 ErrorMessageExceptionTypeRouter 遍历“异常原因”的层次结构(例如,payload.getCause())以查找最具体的 Throwable 类型或通道映射,并使用 mappingClass.isInstance(cause)cause 与类或任何超类匹配。

在这种情况下,通道映射顺序很重要。 因此,如果需要获取 IllegalArgumentException 的映射,而不是 RuntimeException,则必须首先在路由器上配置后者。

自 4.3 版以来,ErrorMessageExceptionTypeRouter 在初始化阶段加载所有映射类,以便在 ClassNotFoundException 时快速失败。

以下示例显示了 ErrorMessageExceptionTypeRouter 的示例配置:

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • XML DSL

@Bean
public IntegrationFlow someFlow() {
    return f -> f
            .routeByException(r -> r
                 .channelMapping(IllegalArgumentException.class, "illegalChannel")
                 .channelMapping(NullPointerException.class, "npeChannel")
                 .defaultOutputChannel("defaultChannel"));
}
@Bean
fun someFlow() =
    integrationFlow {
        routeByException {
                    channelMapping(IllegalArgumentException::class.java, "illegalChannel")
                    channelMapping(NullPointerException::class.java, "npeChannel")
                    defaultOutputChannel("defaultChannel")
                }
    }
@Bean
someFlow() {
    integrationFlow {
        routeByException {
            channelMapping IllegalArgumentException, 'illegalChannel'
            channelMapping NullPointerException, 'npeChannel'
            defaultOutputChannel 'defaultChannel'
        }
    }
}
<int:exception-type-router input-channel="inputChannel"
                           default-output-channel="defaultChannel">
    <int:mapping exception-type="java.lang.IllegalArgumentException"
                 channel="illegalChannel"/>
    <int:mapping exception-type="java.lang.NullPointerException"
                 channel="npeChannel"/>
</int:exception-type-router>

<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />