Asynchronous Outbound Gateway

上一部分讨论的网关是同步的,发送线程会被挂起,直到收到回复(或出现超时)。Spring Integration 版本 4.3 添加了一个异步网关,该网关使用 Spring AMQP 中的 AsyncRabbitTemplate。当发送消息时,线程在发送操作完成后立即返回,当收到消息时,会在模板的监听器容器线程上发送回复。这在对轮询线程调用网关时很有用。线程会释放,并且可在框架中用于其他任务。

The gateway discussed in the previous section is synchronous, in that the sending thread is suspended until a reply is received (or a timeout occurs). Spring Integration version 4.3 added an asynchronous gateway, which uses the AsyncRabbitTemplate from Spring AMQP. When a message is sent, the thread returns immediately after the send operation completes, and, when the message is received, the reply is sent on the template’s listener container thread. This can be useful when the gateway is invoked on a poller thread. The thread is released and is available for other tasks in the framework.

以下清单显示了 AMQP 异步出站网关的可能配置选项:

The following listing shows the possible configuration options for an AMQP asynchronous outbound gateway:

  • Java DSL

  • Java

  • XML

@Configuration
public class AmqpAsyncApplication {

    @Bean
    public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
        return f -> f
                .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
                        .routingKey("queue1")); // default exchange - route to queue 'queue1'
    }

    @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
    public interface MyGateway {

        String sendToRabbit(String data);

    }

}
@Configuration
public class AmqpAsyncConfig {

    @Bean
    @ServiceActivator(inputChannel = "amqpOutboundChannel")
    public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
        AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
                     SimpleMessageListenerContainer replyContainer) {

        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
    }

    @Bean
    public SimpleMessageListenerContainer replyContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
        container.setQueueNames("asyncRQ1");
        return container;
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

}
<int-amqp:outbound-async-gateway id="asyncOutboundGateway"    1
                           request-channel="myRequestChannel" 2
                           async-template=""                  3
                           exchange-name=""                   4
                           exchange-name-expression=""        5
                           order="1"                          6
                           reply-channel=""                   7
                           reply-timeout=""                   8
                           requires-reply=""                  9
                           routing-key=""                     10
                           routing-key-expression=""          11
                           default-delivery-mode""            12
                           confirm-correlation-expression=""  13
                           confirm-ack-channel=""             14
                           confirm-nack-channel=""            15
                           confirm-timeout=""                 16
                           return-channel=""                  17
                           lazy-connect="true" />             18
1 此适配器的唯一 ID。可选。
2 The unique ID for this adapter. Optional.
3 为了将消息转换为 AMQP 交换并将消息发布到 AMQP 交换,应将消息发送到的消息通道。必需的。
4 Message channel to which messages should be sent in order to have them converted and published to an AMQP exchange. Required.
5 对已配置的 AsyncRabbitTemplate 的 bean 引用。可选(默认值为 asyncRabbitTemplate)。
6 Bean reference to the configured AsyncRabbitTemplate. Optional (it defaults to asyncRabbitTemplate).
7 应将消息发送到的 AMQP 交换的名称。如果没有提供,则消息将发送到默认的无名交换。与“exchange-name-expression”互斥。可选。
8 The name of the AMQP exchange to which messages should be sent. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name-expression'. Optional.
9 一个 SpEL 表达式,评估后确定发送消息的 AMQP 交换的名称,其中消息为根对象。如果没有提供,则消息将发送到默认的无名交换。与“exchange-name”互斥。可选。
10 A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages are sent, with the message as the root object. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name'. Optional.
11 当注册多个使用者时的使用者顺序,从而启用负载平衡和故障转移。可选(默认值为 Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。
12 The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover. Optional (it defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]).
13 从 AMQP 队列接收并转换后,应发送响应到的消息通道。可选。
14 Message channel to which replies should be sent after being received from an AMQP queue and converted. Optional.
15 网关在将应答消息发送到 reply-channel 等待的时间。这仅适用于 reply-channel 可以阻止的情况——比如,容量已满的 QueueChannel。默认值为无穷大。
16 The time the gateway waits when sending the reply message to the reply-channel. This only applies if the reply-channel can block — such as a QueueChannel with a capacity limit that is currently full. The default is infinity.
17 AsyncRabbitTemplate&#8217;s `receiveTimeout 属性内未收到应答消息,并且此设置是 true 时,网关会向入站消息的 errorChannel 标头发送一条错误消息。在 AsyncRabbitTemplate&#8217;s `receiveTimeout 属性内未收到应答消息,并且此设置是 false 时,网关会向默认的 errorChannel(如果可用)发送一条错误消息。其默认值为 true
18 When no reply message is received within the AsyncRabbitTemplate’s `receiveTimeout property and this setting is true, the gateway sends an error message to the inbound message’s errorChannel header. When no reply message is received within the AsyncRabbitTemplate’s `receiveTimeout property and this setting is false, the gateway sends an error message to the default errorChannel (if available). It defaults to true.
19 发送消息时使用的路由键。默认情况下,这是一个空 String。与“routing-key-expression”互斥。可选。
20 The routing-key to use when sending Messages. By default, this is an empty String. Mutually exclusive with 'routing-key-expression'. Optional.
21 一个 SpEL 表达式,评估后确定发送消息时使用的 routing-key,其中消息为根对象(例如,“payload.key”)。默认情况下,这是一个空 String。与“routing-key”互斥。可选。
22 A SpEL expression that is evaluated to determine the routing-key to use when sending messages, with the message as the root object (for example, 'payload.key'). By default, this is an empty String. Mutually exclusive with 'routing-key'. Optional.
23 消息的默认传递模式:PERSISTENTNON_PERSISTENT。如果 header-mapper 设置了传递模式,则会覆盖该模式。如果 Spring 集成消息标头 (amqp_deliveryMode) 已存在,则 DefaultHeaderMapper 将设置该值。如果未提供此属性,且标头映射器未设置它,则默认值取决于 RabbitTemplate 所使用的底层 Spring AMQP MessagePropertiesConverter。如果未自定义该值,则默认值为 PERSISTENT。可选。
24 The default delivery mode for messages: PERSISTENT or NON_PERSISTENT. Overridden if the header-mapper sets the delivery mode. If the Spring Integration message header (amqp_deliveryMode) is present, the DefaultHeaderMapper sets the value. If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP MessagePropertiesConverter used by the RabbitTemplate. If that is not customized, the default is PERSISTENT. Optional.
25 定义相关性数据的表达式。提供后,这会配置底层 AMQP 模板以接收发布者确认。需要专用的 RabbitTemplate 和一个 CachingConnectionFactory,其 publisherConfirms 属性设置为 true。当收到发布者确认且提供了相关性数据时,该确认将写入 confirm-ack-channelconfirm-nack-channel,具体取决于确认类型。确认的负载是此表达式定义的相关性数据,且消息的“amqp_publishConfirm”标头设置为 true (ack) 或 false (nack)。对于 nack 实例,额外提供了一个标头 (amqp_publishConfirmNackCause)。示例:headers['myCorrelationData']payload。如果表达式解析为 Message&lt;?&gt; 实例(例如,“#this”),则 ack/nack 通道上发出的消息基于该消息,并添加额外的标头。另请参见 Alternative Mechanism for Publisher Confirms and Returns。可选。
26 An expression that defines correlation data. When provided, this configures the underlying AMQP template to receive publisher confirmations. Requires a dedicated RabbitTemplate and a CachingConnectionFactory with its publisherConfirms property set to true. When a publisher confirmation is received and correlation data is supplied, the confirmation is written to either the confirm-ack-channel or the confirm-nack-channel, depending on the confirmation type. The payload of the confirmation is the correlation data as defined by this expression, and the message has its 'amqp_publishConfirm' header set to true (ack) or false (nack). For nack instances, an additional header (amqp_publishConfirmNackCause) is provided. Examples: headers['myCorrelationData'], payload. If the expression resolves to a Message<?> instance (such as “#this”), the message emitted on the ack/nack channel is based on that message, with the additional headers added. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional.
27 将正向(ack)发布者确认发送的通道。负载是 confirm-correlation-expression 定义的相关性数据。需要底层的 AsyncRabbitTemplate 将其 enableConfirms 属性设置为 true。另请参见 Alternative Mechanism for Publisher Confirms and Returns。可选(默认值为 nullChannel)。
28 The channel to which positive (ack) publisher confirmations are sent. The payload is the correlation data defined by the confirm-correlation-expression. Requires the underlying AsyncRabbitTemplate to have its enableConfirms property set to true. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional (the default is nullChannel).
29 4.2 版之后。将负向(nack)发布者确认发送的通道。负载是 confirm-correlation-expression 定义的相关性数据。需要底层的 AsyncRabbitTemplate 将其 enableConfirms 属性设置为 true。另请参见 Alternative Mechanism for Publisher Confirms and Returns。可选(默认值为 nullChannel)。
30 Since version 4.2. The channel to which negative (nack) publisher confirmations are sent. The payload is the correlation data defined by the confirm-correlation-expression. Requires the underlying AsyncRabbitTemplate to have its enableConfirms property set to true. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional (the default is nullChannel).
31 在设置此项时,如果在毫秒内未收到发布者确认,网关将综合一个负面确认 (nack)。待定确认的检查频率为该值的 50%,因此实际发送 nack 的时间将在该值的 1x 和 1.5x 之间。另请参阅 Alternative Mechanism for Publisher Confirms and Returns。默认为无(不会生成 nack)。
32 When set, the gateway will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds. Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value. Also see Alternative Mechanism for Publisher Confirms and Returns. Default none (nacks will not be generated).
33 返回消息所发送到的通道。提供时,底层 AMQP 模板被配置为将不可传递的消息返回到网关。消息由从 AMQP 接收的数据构建,并带有以下其他标头:amqp_returnReplyCodeamqp_returnReplyTextamqp_returnExchangeamqp_returnRoutingKey。要求底层 AsyncRabbitTemplate 将其 mandatory 属性设置为 true。另请参阅 Alternative Mechanism for Publisher Confirms and Returns。可选。
34 The channel to which returned messages are sent. When provided, the underlying AMQP template is configured to return undeliverable messages to the gateway. The message is constructed from the data received from AMQP, with the following additional headers: amqp_returnReplyCode, amqp_returnReplyText, amqp_returnExchange, and amqp_returnRoutingKey. Requires the underlying AsyncRabbitTemplate to have its mandatory property set to true. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional.
35 在设置该项为 false 时,端点在应用程序上下文初始化期间尝试连接到代理。这样做可以使 “fail fast” 通过在代理中断开时记录一条错误消息来检测配置不当。在 true(默认设置)下,连接会在发送首条消息时建立(如果该连接尚未存在,因为其他组件已建立了该连接)。
36 When set to false, the endpoint tries to connect to the broker during application context initialization. Doing so allows “fail fast” detection of bad configuration, by logging an error message if the broker is down. When true (the default), the connection is established (if it does not already exist because some other component established it) when the first message is sent.

有关更多信息,另请参见 Asynchronous Service Activator

See also Asynchronous Service Activator for more information.

Example 1. RabbitTemplate

在您使用确认和返回时,我们建议将连线到`AsyncRabbitTemplate`的`RabbitTemplate`专门用于此目的。否则,可能会遇到意想不到的副作用。

When you use confirmations and returns, we recommend that the RabbitTemplate wired into the AsyncRabbitTemplate be dedicated. Otherwise, unexpected side effects may be encountered.