Outbound Gateway

以下清单显示了 AMQP 输出网关的可能属性:

The following listing shows the possible properties for an AMQP Outbound Gateway:

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
    return f -> f.handle(Amqp.outboundGateway(amqpTemplate)
                    .routingKey("foo")) // default exchange - route to queue 'foo'
            .get();
}

@MessagingGateway(defaultRequestChannel = "amqpOutbound.input")
public interface MyGateway {

    String sendToRabbit(String data);

}
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setExpectReply(true);
    outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
    return outbound;
}

@Bean
public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}

@MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
public interface MyGateway {

    String sendToRabbit(String data);

}
<int-amqp:outbound-gateway id="outboundGateway"               1
                           request-channel="myRequestChannel" 2
                           amqp-template=""                   3
                           exchange-name=""                   4
                           exchange-name-expression=""        5
                           order="1"                          6
                           reply-channel=""                   7
                           reply-timeout=""                   8
                           requires-reply=""                  9
                           routing-key=""                     10
                           routing-key-expression=""          11
                           default-delivery-mode""            12
                           confirm-correlation-expression=""  13
                           confirm-ack-channel=""             14
                           confirm-nack-channel=""            15
                           confirm-timeout=""                 16
                           return-channel=""                  17
                           error-message-strategy=""          18
                           lazy-connect="true" />             19
1 此适配器的唯一 ID。可选。
2 The unique ID for this adapter. Optional.
3 消息发送到的消息通道,以便将其转换并发布到 AMQP 交换。必需。
4 Message channel to which messages are sent to have them converted and published to an AMQP exchange. Required.
5 配置的 AMQP 模板的 Bean 引用。可选(默认为 amqpTemplate)。
6 Bean reference to the configured AMQP template. Optional (defaults to amqpTemplate).
7 应发送消息到的 AMQP 交换的名称。如果未提供,则会将消息发送到默认的无名称交换。与“exchange-name-expression”互斥。可选。
8 The name of the AMQP exchange to which messages should be sent. If not provided, messages are sent to the default, no-name cxchange. Mutually exclusive with 'exchange-name-expression'. Optional.
9 用作根对象的 SpEL 表达式,经评估用以确定应向其发送消息的 AMQP 交换的名称。如果未提供,则会将消息发送到默认的无名称交换。与“exchange-name”互斥。可选。
10 A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages should be sent, with the message as the root object. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name'. Optional.
11 当多个使用者已注册时的此使用者顺序,从而启用负载平衡和故障转移。可选(默认为 Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。
12 The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover. Optional (defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]).
13 从 AMQP 队列接收并转换后,应发送响应到的消息通道。可选。
14 Message channel to which replies should be sent after being received from an AMQP queue and converted. Optional.
15 网关在向 reply-channel`发送响应消息时等待的时间。仅适用于 `reply-channel`可以阻塞(例如一个容量限制当前已满的 `QueueChannel)。默认为无穷大。
16 The time the gateway waits when sending the reply message to the reply-channel. This only applies if the reply-channel can block — such as a QueueChannel with a capacity limit that is currently full. Defaults to infinity.
17 true,如果在 AmqpTemplate&#8217;s `replyTimeout 属性中未收到答复消息,则网关会抛出异常。默认为 true
18 When true, the gateway throws an exception if no reply message is received within the AmqpTemplate’s `replyTimeout property. Defaults to true.
19 发送消息时要使用的 routing-key。默认情况下,这是一个空的 String。与“路由键表达式”互斥。可选。
20 The routing-key to use when sending messages. By default, this is an empty String. Mutually exclusive with 'routing-key-expression'. Optional.
21 一个 SpEL 表达式,可对其求值以确定在发送消息时要使用的 routing-key,并使用消息作为根对象(例如,“payload.key”)。默认情况下,这是一个空的 String。与“路由键”互斥。可选。
22 A SpEL expression that is evaluated to determine the routing-key to use when sending messages, with the message as the root object (for example, 'payload.key'). By default, this is an empty String. Mutually exclusive with 'routing-key'. Optional.
23 消息的默认传递模式:PERSISTENTNON_PERSISTENT。如果 header-mapper 设置了传递模式,则会覆盖。如果存在 Spring Integration 消息头 amqp_deliveryMode,则 DefaultHeaderMapper 会设置值。如果没有提供此属性并且头映射器未将其设置,则默认值取决于 RabbitTemplate 使用的底层 Spring AMQP MessagePropertiesConverter。如果根本没有对其进行自定义,则默认值为 PERSISTENT。可选。
24 The default delivery mode for messages: PERSISTENT or NON_PERSISTENT. Overridden if the header-mapper sets the delivery mode. If the Spring Integration message header amqp_deliveryMode is present, the DefaultHeaderMapper sets the value. If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP MessagePropertiesConverter used by the RabbitTemplate. If that is not customized at all, the default is PERSISTENT. Optional.
25 自 4.2 版起。定义相关数据的一个表达式。提供时,它将配置底层 AMQP 模板以接收发布者确认。需要一个专用的 RabbitTemplate 和一个 CachingConnectionFactory,其 publisherConfirms 的属性设置为 true。收到发布者确认并且提供了相关数据时,会根据确认类型将其写入 confirm-ack-channelconfirm-nack-channel。确认的有效负载是相关数据,由这个表达式定义。消息有一个头“amqp_publishConfirm”,设置为 trueack)或 falsenack)。对于 nack 确认,Spring Integration 提供了一个附加头 amqp_publishConfirmNackCause。示例:headers['myCorrelationData']payload。如果表达式解析为 Message&lt;?&gt; 实例(例如,#this),则在 ack/nack 通道上发出的消息基于该消息,并添加附加头。以前,会创建一个新的消息,其相关数据作为有效负载,无论类型如何。另请参阅 Alternative Mechanism for Publisher Confirms and Returns。可选。
26 Since version 4.2. An expression defining correlation data. When provided, this configures the underlying AMQP template to receive publisher confirms. Requires a dedicated RabbitTemplate and a CachingConnectionFactory with the publisherConfirms property set to true. When a publisher confirm is received and correlation data is supplied, it is written to either the confirm-ack-channel or the confirm-nack-channel, depending on the confirmation type. The payload of the confirm is the correlation data, as defined by this expression. The message has a header 'amqp_publishConfirm' set to true (ack) or false (nack). For nack confirmations, Spring Integration provides an additional header amqp_publishConfirmNackCause. Examples: headers['myCorrelationData'] and payload. If the expression resolves to a Message<?> instance (such as #this), the message emitted on the ack/nack channel is based on that message, with the additional headers added. Previously, a new message was created with the correlation data as its payload, regardless of type. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional.
27 将正向(ack)发布者确认发送的通道。负载是 confirm-correlation-expression 定义的相关性数据。如果表达式为 #root#this,则该消息将从原始消息构建,其中 amqp_publishConfirm 标头设置为 true。另请参见 Alternative Mechanism for Publisher Confirms and Returns。可选(默认值为 nullChannel)。
28 The channel to which positive (ack) publisher confirmations are sent. The payload is the correlation data defined by confirm-correlation-expression. If the expression is #root or #this, the message is built from the original message, with the amqp_publishConfirm header set to true. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional (the default is nullChannel).
29 将负向(nack)发布者确认发送的通道。如果未配置 ErrorMessageStrategy,则负载是 confirm-correlation-expression 定义的相关性数据。如果表达式为 #root#this,则该消息将从原始消息构建,其中 amqp_publishConfirm 标头设置为 false。存在 ErrorMessageStrategy 时,该消息为 ErrorMessage,并带有 NackedAmqpMessageException 负载。另请参见 Alternative Mechanism for Publisher Confirms and Returns。可选(默认值为 nullChannel)。
30 The channel to which negative (nack) publisher confirmations are sent. The payload is the correlation data defined by confirm-correlation-expression (if there is no ErrorMessageStrategy configured). If the expression is #root or #this, the message is built from the original message, with the amqp_publishConfirm header set to false. When there is an ErrorMessageStrategy, the message is an ErrorMessage with a NackedAmqpMessageException payload. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional (the default is nullChannel).
31 设置后,如果在毫秒数内未收到发布者确认,网关会合成一个负面确认(nack)。待处理的确认每 50% 该值检查一次,因此发送 nack 的实际时间将介于此值的 1x 和 1.5x 之间。默认值为无(不会生成 nack)。
32 When set, the gateway will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds. Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value. Default none (nacks will not be generated).
33 将返回的消息发送到的通道。提供时,底层 AMQP 模板会被配置为将无法投递的消息返回给适配器。当未配置 ErrorMessageStrategy 时,该消息由 AMQP 收到的数据构建,并带有以下附加标头:amqp_returnReplyCodeamqp_returnReplyTextamqp_returnExchangeamqp_returnRoutingKey。存在 ErrorMessageStrategy 时,该消息为 ErrorMessage,并带有 ReturnedAmqpMessageException 负载。另请参见 Alternative Mechanism for Publisher Confirms and Returns。可选。
34 The channel to which returned messages are sent. When provided, the underlying AMQP template is configured to return undeliverable messages to the adapter. When there is no ErrorMessageStrategy configured, the message is constructed from the data received from AMQP, with the following additional headers: amqp_returnReplyCode, amqp_returnReplyText, amqp_returnExchange, and amqp_returnRoutingKey. When there is an ErrorMessageStrategy, the message is an ErrorMessage with a ReturnedAmqpMessageException payload. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional.
35 ErrorMessageStrategy 实现的引用,用于在发送返回的消息或未被证实的消息时构建 ErrorMessage 实例。
36 A reference to an ErrorMessageStrategy implementation used to build ErrorMessage instances when sending returned or negatively acknowledged messages.
37 设置为 false 时,端点会在应用程序上下文初始化期间尝试连接到代理。如果代理不可用,这将允许 “fail fast” 通过记录错误消息来检测错误配置。当 true(默认值)时,连接会在发送第一条消息时建立(如果尚未存在,因为其他某个组件已建立)。
38 When set to false, the endpoint attempts to connect to the broker during application context initialization. This allows “fail fast” detection of bad configuration by logging an error message if the broker is down. When true (the default), the connection is established (if it does not already exist because some other component established it) when the first message is sent.
Example 1. return-channel

使用 return-channel 需要将 RabbitTemplatemandatory 属性设置为 true,并将 CachingConnectionFactorypublisherReturns 属性设置为 true。当使用带有返回的多个出站端点时,每个端点都需要一个单独的 RabbitTemplate

Using a return-channel requires a RabbitTemplate with the mandatory property set to true and a CachingConnectionFactory with the publisherReturns property set to true. When using multiple outbound endpoints with returns, a separate RabbitTemplate is needed for each endpoint.

基础 AmqpTemplate 预设的默认 replyTimeout 为五秒。如果您需要更长的超时,必须在 template 中对其进行配置。

The underlying AmqpTemplate has a default replyTimeout of five seconds. If you require a longer timeout, you must configure it on the template.

请注意,出站适配器和出站网关配置唯一的区别在于 expectReply 属性的设置。

Note that the only difference between the outbound adapter and outbound gateway configuration is the setting of the expectReply property.