请求/回复消息
AmqpTemplate
也提供了多种 sendAndReceive
方法,它们接受与前面描述的单向发送操作(exchange
、routingKey
和 Message
)相同的参数选项。
这些方法对于请求-回复场景非常有用,因为它们在发送之前处理所需 reply-to
属性的配置,并且可以监听为此目的在内部创建的独占队列上的回复消息。
类似的请求-回复方法也适用于 MessageConverter
应用于请求和回复的情况。
这些方法名为 convertSendAndReceive
。
有关更多详细信息,请参阅 {spring-amqp-java-docs}/core/AmqpTemplate.html[AmqpTemplate
的 Javadoc]。
从版本 1.5.0 开始,每个 sendAndReceive
方法变体都有一个接受 CorrelationData
的重载版本。
结合正确配置的连接工厂,这使得操作发送方能够接收发布者确认。
有关更多信息,请参阅 关联的发布者确认和返回 和 {spring-amqp-java-docs}/rabbit/core/RabbitOperations.html[RabbitOperations
的 Javadoc]。
从版本 2.0 开始,这些方法(convertSendAndReceiveAsType
)的变体接受一个额外的 ParameterizedTypeReference
参数,用于转换复杂的返回类型。
模板必须配置 SmartMessageConverter
。
有关更多信息,请参阅 使用 RabbitTemplate
从 Message
转换。
从版本 2.1 开始,您可以配置 RabbitTemplate
的 noLocalReplyConsumer
选项,以控制回复消费者的 noLocal
标志。
此选项默认为 false
。
回复超时
默认情况下,发送和接收方法在五秒后超时并返回 null。
您可以通过设置 replyTimeout
属性来修改此行为。
从版本 1.5 开始,如果您将 mandatory
属性设置为 true
(或者 mandatory-expression
对于特定消息评估为 true
),如果消息无法传递到队列,则会抛出 AmqpMessageReturnedException
。
此异常具有 returnedMessage
、replyCode
和 replyText
属性,以及用于发送的 exchange
和 routingKey
。
此功能使用发布者返回。
您可以通过在 |
从版本 2.1.2 开始,添加了一个 replyTimedOut
方法,允许子类收到超时通知,以便它们可以清理任何保留的状态。
从版本 2.0.11 和 2.1.3 开始,当您使用默认的 DirectReplyToMessageListenerContainer
时,可以通过设置模板的 replyErrorHandler
属性来添加错误处理器。
此错误处理器将针对任何失败的传递(例如延迟回复和未收到关联头部的消息)进行调用。
传入的异常是 ListenerExecutionFailedException
,它具有 failedMessage
属性。
RabbitMQ 直接回复
从版本 3.4.0 开始,RabbitMQ 服务器支持 直接回复。
这消除了固定回复队列的主要原因(避免为每个请求创建临时队列的需要)。
从 Spring AMQP 1.4.1 版本开始,默认使用直接回复(如果服务器支持),而不是创建临时回复队列。
当未提供 replyQueue
(或将其名称设置为 amq.rabbitmq.reply-to
)时,RabbitTemplate
会自动检测是否支持直接回复,并选择使用它或回退到使用临时回复队列。
使用直接回复时,不需要 reply-listener
,也不应进行配置。
回复监听器仍然支持命名队列(amq.rabbitmq.reply-to
除外),允许控制回复并发等。
从版本 1.6 开始,如果您希望为每个回复使用临时、独占、自动删除队列,请将 useTemporaryReplyQueues
属性设置为 true
。
如果您设置了 replyAddress
,则此属性将被忽略。
您可以通过继承 RabbitTemplate
并重写 useDirectReplyTo()
来更改决定是否使用直接回复的标准,以检查不同的标准。
该方法只在发送第一个请求时调用一次。
在 2.0 版本之前,RabbitTemplate
为每个请求创建一个新的消费者,并在收到回复(或超时)时取消该消费者。
现在,模板改为使用 DirectReplyToMessageListenerContainer
,允许消费者被重用。
模板仍然负责关联回复,因此没有延迟回复发送给不同发送者的风险。
如果您想恢复到以前的行为,请将 useDirectReplyToContainer
(使用 XML 配置时为 direct-reply-to-container
)属性设置为 false。
AsyncRabbitTemplate
没有此选项。
当使用直接回复时,它总是使用 DirectReplyToContainer
进行回复。
从版本 2.3.7 开始,模板新增了一个属性 useChannelForCorrelation
。
当此属性为 true
时,服务器无需将关联 ID 从请求消息头部复制到回复消息。
相反,用于发送请求的通道将用于将回复与请求关联起来。
使用回复队列进行消息关联
当使用固定回复队列(amq.rabbitmq.reply-to
除外)时,您必须提供关联数据,以便可以将回复与请求关联起来。
请参阅 RabbitMQ 远程过程调用 (RPC)。
默认情况下,标准 correlationId
属性用于保存关联数据。
但是,如果您希望使用自定义属性来保存关联数据,则可以在 <rabbit-template/>
上设置 correlation-key
属性。
将属性显式设置为 correlationId
与省略该属性相同。
客户端和服务器必须使用相同的头部进行关联数据。
Spring AMQP 1.1 版本使用名为 |
默认情况下,模板会生成自己的关联 ID(忽略任何用户提供的值)。
如果您希望使用自己的关联 ID,请将 RabbitTemplate
实例的 userCorrelationId
属性设置为 true
。
关联 ID 必须是唯一的,以避免将错误的回复返回给请求的可能性。
回复监听器容器
当使用 RabbitMQ 3.4.0 之前的版本时,每个回复都会使用一个新的临时队列。
但是,可以在模板上配置一个单一的回复队列,这样效率更高,并且还可以让您在该队列上设置参数。
在这种情况下,您还必须提供一个 <reply-listener/>
子元素。
此元素为回复队列提供一个监听器容器,模板作为监听器。
<listener-container/>
元素上允许的所有 消息监听器容器配置 属性都允许在该元素上使用,但 connection-factory
和 message-converter
除外,它们继承自模板的配置。
如果您运行应用程序的多个实例或使用多个 RabbitTemplate
实例,则 必须 为每个实例使用唯一的回复队列。
RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,每个实例将争夺回复,并且不一定会收到自己的回复。
以下示例定义了一个带有连接工厂的 Rabbit 模板:
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
虽然容器和模板共享一个连接工厂,但它们不共享一个通道。 因此,请求和回复不会在同一事务中执行(如果事务性)。
在版本 1.5.0 之前, |
通过此配置,SimpleListenerContainer
用于接收回复,RabbitTemplate
作为 MessageListener
。
当使用 <rabbit:template/>
命名空间元素定义模板时,如前面的示例所示,解析器定义容器并将模板作为监听器连接。
当模板不使用固定的 |
如果您将 RabbitTemplate
定义为 <bean/>
或使用 @Configuration
类将其定义为 @Bean
,或者当您以编程方式创建模板时,您需要自己定义并连接回复监听器容器。
如果您未能这样做,模板将永远不会收到回复,并最终超时并返回 null 作为 sendAndReceive
方法调用的回复。
从版本 1.5 开始,RabbitTemplate
会检测它是否已配置为 MessageListener
以接收回复。
如果未配置,则尝试发送和接收带回复地址的消息将失败并抛出 IllegalStateException
(因为永远不会收到回复)。
此外,如果使用简单的 replyAddress
(队列名称),回复监听器容器会验证它是否正在监听具有相同名称的队列。
如果回复地址是交换和路由键,则无法执行此检查,并且会写入调试日志消息。
当您自己连接回复监听器和模板时,务必确保模板的 replyAddress
和容器的 queues
(或 queueNames
)属性引用同一个队列。
模板将回复地址插入到出站消息的 replyTo
属性中。
以下列表显示了如何手动连接 bean 的示例:
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
一个完整的 RabbitTemplate
示例,它连接了一个固定回复队列,以及一个处理请求并返回回复的“远程”监听器容器,请参阅 此测试用例。
当回复超时 (replyTimeout
) 时,sendAndReceive()
方法返回 null。
在 1.3.6 版本之前,超时消息的延迟回复只会记录下来。
现在,如果收到延迟回复,它将被拒绝(模板抛出 AmqpRejectAndDontRequeueException
)。
如果回复队列配置为将拒绝的消息发送到死信交换,则可以检索回复以供以后分析。
为此,将一个队列绑定到配置的死信交换,其路由键等于回复队列的名称。
有关配置死信的更多信息,请参阅 RabbitMQ 死信文档。
您还可以查看 FixedReplyQueueDeadLetterTests
测试用例以获取示例。
异步 Rabbit 模板
版本 1.6 引入了 AsyncRabbitTemplate
。
它具有与 AmqpTemplate
上类似的 sendAndReceive
(和 convertSendAndReceive
)方法。
然而,它们不是阻塞,而是返回一个 CompletableFuture
。
sendAndReceive
方法返回 RabbitMessageFuture
。
convertSendAndReceive
方法返回 RabbitConverterFuture
。
您可以稍后通过调用 future 上的 get()
同步检索结果,也可以注册一个回调,该回调将异步地调用结果。
以下列表显示了两种方法:
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
如果设置了 mandatory
并且消息无法传递,Future 将抛出 ExecutionException
,其原因是 AmqpMessageReturnedException
,它封装了返回的消息和有关返回的信息。
如果设置了 enableConfirms
,则 Future 具有一个名为 confirm
的属性,它本身是一个 CompletableFuture<Boolean>
,其中 true
表示发布成功。
如果确认 Future 为 false
,则 RabbitFuture
具有一个名为 nackCause
的附加属性,其中包含失败的原因(如果可用)。
如果发布者确认在回复之后收到,则会被丢弃,因为回复意味着成功发布。
您可以设置模板的 receiveTimeout
属性来使回复超时(默认为 30000
- 30 秒)。
如果发生超时,Future 将以 AmqpReplyTimeoutException
完成。
模板实现了 SmartLifecycle
。
当存在待处理的回复时停止模板会导致待处理的 Future
实例被取消。
从版本 2.0 开始,异步模板现在支持 直接回复,而不是配置的回复队列。 要启用此功能,请使用以下构造函数之一:
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
请参阅 RabbitMQ 直接回复,以将直接回复与同步 RabbitTemplate
一起使用。
版本 2.0 引入了这些方法的变体(convertSendAndReceiveAsType
),它们接受一个额外的 ParameterizedTypeReference
参数,用于转换复杂的返回类型。
您必须使用 SmartMessageConverter
配置底层 RabbitTemplate
。
有关更多信息,请参阅 使用 RabbitTemplate
从 Message
转换。