出站通道适配器
以下示例显示了 AMQP 出站通道适配器的可用属性:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,
MessageChannel amqpOutboundChannel) {
return IntegrationFlow.from(amqpOutboundChannel)
.handle(Amqp.outboundAdapter(amqpTemplate)
.routingKey("queue1")) // default exchange - route to queue 'queue1'
.get();
}
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
outbound.setRoutingKey("queue1"); // default exchange - route to queue 'queue1'
return outbound;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
<int-amqp:outbound-channel-adapter id="outboundAmqp" [id="CO1-1"]1
channel="outboundChannel" [id="CO1-2"]2
amqp-template="myAmqpTemplate" [id="CO1-3"]3
exchange-name="" [id="CO1-4"]4
exchange-name-expression="" [id="CO1-5"]5
order="1" [id="CO1-6"]6
routing-key="" [id="CO1-7"]7
routing-key-expression="" [id="CO1-8"]8
default-delivery-mode"" [id="CO1-9"]9
confirm-correlation-expression="" [id="CO1-10"]10
confirm-ack-channel="" [id="CO1-11"]11
confirm-nack-channel="" [id="CO1-12"]12
confirm-timeout="" [id="CO1-13"]13
wait-for-confirm="" [id="CO1-14"]14
return-channel="" [id="CO1-15"]15
error-message-strategy="" [id="CO1-16"]16
header-mapper="" [id="CO1-17"]17
mapped-request-headers="" [id="CO1-18"]18
lazy-connect="true" [id="CO1-19"]19
multi-send="false"/> [id="CO1-20"]20
<1> 此适配器的唯一 ID。 可选。 <1> 消息通道,消息应发送到该通道,以便将其转换并发布到 AMQP 交换机。 必填。 <1> 对已配置的 AMQP 模板的 Bean 引用。 可选(默认为 `amqpTemplate`)。 <1> 消息发送到的 AMQP 交换机名称。 如果未提供,消息将发送到默认的无名交换机。 与 'exchange-name-expression' 互斥。 可选。 <1> 一个 SpEL 表达式,用于确定消息发送到的 AMQP 交换机名称,其中消息是根对象。 如果未提供,消息将发送到默认的无名交换机。 与 'exchange-name' 互斥。 可选。 <1> 当注册了多个消费者时,此消费者的顺序,从而实现负载均衡和故障转移。 可选(默认为 `Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]`)。 <1> 发送消息时使用的固定路由键。 默认情况下,这是一个空 `String`。 与 'routing-key-expression' 互斥。 可选。 <1> 一个 SpEL 表达式,用于确定发送消息时使用的路由键,其中消息是根对象(例如,'payload.key')。 默认情况下,这是一个空 `String`。 与 'routing-key' 互斥。 可选。 <1> 消息的默认投递模式:`PERSISTENT` 或 `NON_PERSISTENT`。 如果 `header-mapper` 设置了投递模式,则此值会被覆盖。 如果存在 Spring Integration 消息头 `amqp_deliveryMode`,则 `DefaultHeaderMapper` 会设置该值。 如果未提供此属性且消息头映射器未设置它,则默认值取决于 `RabbitTemplate` 使用的底层 Spring AMQP `MessagePropertiesConverter`。 如果完全没有定制,则默认值为 `PERSISTENT`。 可选。 <1> 定义关联数据的表达式。 提供时,这将配置底层 AMQP 模板以接收发布者确认。 需要专用的 `RabbitTemplate` 和一个 `CachingConnectionFactory`,其 `publisherConfirms` 属性设置为 `true`。 当收到发布者确认并提供关联数据时,它会根据确认类型写入 `confirm-ack-channel` 或 `confirm-nack-channel`。 确认的有效负载是关联数据,由该表达式定义。 消息有一个 'amqp_publishConfirm' 消息头,设置为 `true` (`ack`) 或 `false` (`nack`)。 示例:`headers['myCorrelationData']` 和 `payload`。 版本 4.1 引入了 `amqp_publishConfirmNackCause` 消息头。 它包含发布者确认的 'nack' 的 `cause`。 从版本 4.2 开始,如果表达式解析为 `Message<?>` 实例(例如 `#this`),则在 `ack`/`nack` 通道上发出的消息将基于该消息,并添加额外的消息头。 以前,无论类型如何,都会创建一个新消息,并将关联数据作为其有效负载。 另请参见 xref:amqp/alternative-confirms-returns.adoc[发布者确认和返回的替代机制]。 可选。 <1> 接收肯定 (`ack`) 发布者确认的通道。 有效负载是 `confirm-correlation-expression` 定义的关联数据。 如果表达式是 `#root` 或 `#this`,则消息将根据原始消息构建,并设置 `amqp_publishConfirm` 消息头为 `true`。 另请参见 xref:amqp/alternative-confirms-returns.adoc[发布者确认和返回的替代机制]。 可选(默认为 `nullChannel`)。 <1> 接收否定 (`nack`) 发布者确认的通道。 有效负载是 `confirm-correlation-expression` 定义的关联数据(如果没有配置 `ErrorMessageStrategy`)。 如果表达式是 `#root` 或 `#this`,则消息将根据原始消息构建,并设置 `amqp_publishConfirm` 消息头为 `false`。 当存在 `ErrorMessageStrategy` 时,消息是一个 `ErrorMessage`,其有效负载为 `NackedAmqpMessageException`。 另请参见 xref:amqp/alternative-confirms-returns.adoc[发布者确认和返回的替代机制]。 可选(默认为 `nullChannel`)。 <1> 设置后,如果在此毫秒时间内未收到发布者确认,适配器将合成一个否定确认 (nack)。 挂起的确认会在此值的 50% 检查一次,因此发送 nack 的实际时间将介于此值的 1 倍到 1.5 倍之间。 另请参见 xref:amqp/alternative-confirms-returns.adoc[发布者确认和返回的替代机制]。 默认无(不会生成 nack)。 <1> 当设置为 true 时,调用线程将阻塞,等待发布者确认。 这需要配置了确认的 `RabbitTemplate` 以及 `confirm-correlation-expression`。 线程将阻塞长达 `confirm-timeout`(或默认 5 秒)。 如果发生超时,将抛出 `MessageTimeoutException`。 如果启用了返回并且消息被返回,或者在等待确认时发生任何其他异常,将抛出 `MessageHandlingException`,并附带适当的消息。 <1> 返回消息发送到的通道。 提供时,底层 AMQP 模板配置为将无法投递的消息返回给适配器。 当没有配置 `ErrorMessageStrategy` 时,消息是根据从 AMQP 接收到的数据构建的,并带有以下附加消息头:`amqp_returnReplyCode`、`amqp_returnReplyText`、`amqp_returnExchange`、`amqp_returnRoutingKey`。 当存在 `ErrorMessageStrategy` 时,消息是一个 `ErrorMessage`,其有效负载为 `ReturnedAmqpMessageException`。 另请参见 xref:amqp/alternative-confirms-returns.adoc[发布者确认和返回的替代机制]。 可选。 <1> 对 `ErrorMessageStrategy` 实现的引用,用于在发送返回或否定确认消息时构建 `ErrorMessage` 实例。 <1> 对 `AmqpHeaderMapper` 的引用,用于发送 AMQP 消息。 默认情况下,只有标准 AMQP 属性(如 `contentType`)被复制到 Spring Integration `MessageHeaders`。 任何用户定义的头都不会被默认的 `DefaultAmqpHeaderMapper` 复制到消息中。 如果提供了 'request-header-names',则不允许使用。 可选。 <1> 以逗号分隔的 AMQP 消息头名称列表,这些消息头将从 `MessageHeaders` 映射到 AMQP 消息。 如果提供了 'header-mapper' 引用,则不允许使用。 此列表中的值也可以是与消息头名称匹配的简单模式(例如 `"*"` 或 `"thing1*, thing2"` 或 `"*thing1"`)。 <1> 当设置为 `false` 时,端点会在应用程序上下文初始化期间尝试连接到代理。 这允许“快速失败”检测错误配置,但如果代理关闭,也会导致初始化失败。 当设置为 `true`(默认值)时,连接会在发送第一条消息时建立(如果由于其他组件建立而尚未存在)。 <1> 当设置为 `true` 时,类型为 `Iterable<Message<?>>` 的有效负载将在单个 `RabbitTemplate` 调用范围内,作为离散消息在同一通道上发送。 需要一个 `RabbitTemplate`。 当 `wait-for-confirms` 为 true 时,在消息发送后会调用 `RabbitTemplate.waitForConfirmsOrDie()`。 使用事务模板时,发送将在新事务中执行,或在已启动的事务中执行(如果存在)。
Example 1. return-channel
使用 return-channel
需要一个 RabbitTemplate
,其 mandatory
属性设置为 true
,以及一个 CachingConnectionFactory
,其 publisherReturns
属性设置为 true
。
当使用多个具有返回功能的出站端点时,每个端点都需要一个单独的 RabbitTemplate
。