Routing Slip

从 4.1 版本开始,Spring Integration 提供了 routing slip 企业集成模式的实现。它作为 routingSlip 消息标头实现,用于确定 AbstractMessageProducingHandler 实例中的下一个通道,而没有为端点指定 outputChannel。当消息到达没有 output-channel 的端点时,将查询 routingSlip 以确定将消息发送到的下一个通道。当路由单耗尽时,正常的 replyChannel 处理将恢复。

Starting with version 4.1, Spring Integration provides an implementation of the routing slip enterprise integration pattern. It is implemented as a routingSlip message header, which is used to determine the next channel in AbstractMessageProducingHandler instances, when an outputChannel is not specified for the endpoint. This pattern is useful in complex, dynamic cases, when it can become difficult to configure multiple routers to determine message flow. When a message arrives at an endpoint that has no output-channel, the routingSlip is consulted to determine the next channel to which the message is sent. When the routing slip is exhausted, normal replyChannel processing resumes.

路由单据的配置作为 HeaderEnricher 选项呈现——一个由分号分隔的路由单据,它包含 path 条目,如下例所示:

Configuration for the routing slip is presented as a HeaderEnricher option — a semicolon-separated routing slip that contains path entries, as the following example shows:

<util:properties id="properties">
    <beans:prop key="myRoutePath1">channel1</beans:prop>
    <beans:prop key="myRoutePath2">request.headers[myRoutingSlipChannel]</beans:prop>
</util:properties>

<context:property-placeholder properties-ref="properties"/>

<header-enricher input-channel="input" output-channel="process">
    <routing-slip
        value="${myRoutePath1}; @routingSlipRoutingPojo.get(request, reply);
               routingSlipRoutingStrategy; ${myRoutePath2}; finishChannel"/>
</header-enricher>

上例具有:

The preceding example has:

  • A <context:property-placeholder> configuration to demonstrate that the entries in the routing slip path can be specified as resolvable keys.

  • The <header-enricher> <routing-slip> sub-element is used to populate the RoutingSlipHeaderValueMessageProcessor to the HeaderEnricher handler.

  • The RoutingSlipHeaderValueMessageProcessor accepts a String array of resolved routing slip path entries and returns (from processMessage()) a singletonMap with the path as key and 0 as initial routingSlipIndex.

路由单据 path 条目可以包含 MessageChannel Bean 名称、RoutingSlipRouteStrategy Bean 名称和 Spring 表达式 (SpEL)。RoutingSlipHeaderValueMessageProcessor 在第一次 processMessage 调用时根据 BeanFactory 检查每个路由单据 path 条目。它将条目(其不是应用程序上下文中 Bean 名称)转换为 ExpressionEvaluatingRoutingSlipRouteStrategy 实例。RoutingSlipRouteStrategy 条目多次调用,直到它们返回 null 或空 String 为止。

Routing Slip path entries can contain MessageChannel bean names, RoutingSlipRouteStrategy bean names, and Spring expressions (SpEL). The RoutingSlipHeaderValueMessageProcessor checks each routing slip path entry against the BeanFactory on the first processMessage invocation. It converts entries (which are not bean names in the application context) to ExpressionEvaluatingRoutingSlipRouteStrategy instances. RoutingSlipRouteStrategy entries are invoked multiple times, until they return null or an empty String.

由于路由单参与了 getOutputChannel 流程,因此我们有一个请求回复上下文。已经引入了 RoutingSlipRouteStrategy 来确定下一个 outputChannel,该 outputChannel 使用 requestMessagereply 对象。此策略的实现应在应用程序上下文中注册为一个 bean,它的 Bean 名称用于路由单 path。提供了 ExpressionEvaluatingRoutingSlipRouteStrategy 实现。它接受 SpEL 表达式,并将内部 ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply 对象用作求值上下文的根对象。这样做是为了避免为每次 ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath() 调用创建 EvaluationContext 的开销。它是一个简单的 Java bean,具有两个属性:Message<?> requestObject reply。利用这种表达式实现,我们可以使用 SpEL 指定路由单 path 条目(例如 @routingSlipRoutingPojo.get(request, reply)request.headers[myRoutingSlipChannel]),且无需为 RoutingSlipRouteStrategy 定义 bean。

Since the routing slip is involved in the getOutputChannel process, we have a request-reply context. The RoutingSlipRouteStrategy has been introduced to determine the next outputChannel that uses the requestMessage and the reply object. An implementation of this strategy should be registered as a bean in the application context, and its bean name is used in the routing slip path. The ExpressionEvaluatingRoutingSlipRouteStrategy implementation is provided. It accepts a SpEL expression and an internal ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply object is used as the root object of the evaluation context. This is to avoid the overhead of EvaluationContext creation for each ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath() invocation. It is a simple Java bean with two properties: Message<?> request and Object reply. With this expression implementation, we can specify routing slip path entries by using SpEL (for example, @routingSlipRoutingPojo.get(request, reply) and request.headers[myRoutingSlipChannel]) and avoid defining a bean for the RoutingSlipRouteStrategy.

requestMessage 参数始终是 Message<?>。根据上下文,答复对象可能是 Message<?>AbstractIntegrationMessageBuilder 或任意申请领域对象(例如,当其由服务激活程序调用的 POJO 方法返回时)。在首个两种情况下,在使用 SpEL(或 Java 实现)时,可以使用常规 Message 属性(payloadheaders)。对于任意领域对象,这些属性不可用。因此,谨慎地结合 POJO 方法使用路由函件,前提是使用结果确定下一路径。

The requestMessage argument is always a Message<?>. Depending on context, the reply object may be a Message<?>, an AbstractIntegrationMessageBuilder, or an arbitrary application domain object (when, for example, it is returned by a POJO method invoked by a service activator). In the first two cases, the usual Message properties (payload and headers) are available when using SpEL (or a Java implementation). For an arbitrary domain object, these properties are not available. For this reason, be careful when you use routing slips in conjunction with POJO methods if the result is used to determine the next path.

如果在分布式环境中涉及路由单据,我们建议不要对路由单据 path 使用行内表达式。此建议适用于分布式环境,例如在消息代理(如 AMQP SupportJMS Support)中使用 request-reply 的跨 JVM 应用程序,或在集成流中使用持久 MessageStore (Message Store)。框架使用 RoutingSlipHeaderValueMessageProcessor 将它们转换为 ExpressionEvaluatingRoutingSlipRouteStrategy 对象,并在 routingSlip 消息头中使用它们。由于此类不是 Serializable(它不可能是,因为它依赖于 BeanFactory),因此整个 Message 将变为不可序列化且在任何分布式操作中,我们最终都会得到 NotSerializableException。要克服此限制,请在 ExpressionEvaluatingRoutingSlipRouteStrategy 中使用期望的 SpEL 注册一个 bean,并在路由单据 path 配置中使用它的 bean 名称。

If a routing slip is involved in a distributed environment, we recommend not using inline expressions for the Routing Slip path. This recommendation applies to distributed environments such as cross-JVM applications, using a request-reply through a message broker (such asAMQP Support or JMS Support), or using a persistent MessageStore (Message Store) in the integration flow. The framework uses RoutingSlipHeaderValueMessageProcessor to convert them to ExpressionEvaluatingRoutingSlipRouteStrategy objects, and they are used in the routingSlip message header. Since this class is not Serializable (it cannot be, because it depends on the BeanFactory), the entire Message becomes non-serializable and, in any distributed operation, we end up with a NotSerializableException. To overcome this limitation, register an ExpressionEvaluatingRoutingSlipRouteStrategy bean with the desired SpEL and use its bean name in the routing slip path configuration.

对于 Java 配置,可以将 RoutingSlipHeaderValueMessageProcessor 实例添加到 HeaderEnricher Bean 定义中,如下例所示:

For Java configuration, you can add a RoutingSlipHeaderValueMessageProcessor instance to the HeaderEnricher bean definition, as the following example shows:

@Bean
@Transformer(inputChannel = "routingSlipHeaderChannel")
public HeaderEnricher headerEnricher() {
    return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
            new RoutingSlipHeaderValueMessageProcessor("myRoutePath1",
                                                       "@routingSlipRoutingPojo.get(request, reply)",
                                                       "routingSlipRoutingStrategy",
                                                       "request.headers[myRoutingSlipChannel]",
                                                       "finishChannel")));
}

当一个端点生成应答且未定义 outputChannel 时,路由单据算法的工作方式如下:

The routing slip algorithm works as follows when an endpoint produces a reply and no outputChannel has been defined:

  • The routingSlipIndex is used to get a value from the routing slip path list.

  • If the value from routingSlipIndex is String, it is used to get a bean from BeanFactory.

  • If a returned bean is an instance of MessageChannel, it is used as the next outputChannel and the routingSlipIndex is incremented in the reply message header (the routing slip path entries remain unchanged).

  • If a returned bean is an instance of RoutingSlipRouteStrategy and its getNextPath does not return an empty String, that result is used as a bean name for the next outputChannel. The routingSlipIndex remains unchanged.

  • If RoutingSlipRouteStrategy.getNextPath returns an empty String or null, the routingSlipIndex is incremented and the getOutputChannelFromRoutingSlip is invoked recursively for the next Routing Slip path item.

  • If the next routing slip path entry is not a String, it must be an instance of RoutingSlipRouteStrategy.

  • When the routingSlipIndex exceeds the size of the routing slip path list, the algorithm moves to the default behavior for the standard replyChannel header.