请求/回复消息

AmqpTemplate 也提供了多种 sendAndReceive 方法,它们接受与前面描述的单向发送操作(exchangeroutingKeyMessage)相同的参数选项。 这些方法对于请求-回复场景非常有用,因为它们在发送之前处理所需 reply-to 属性的配置,并且可以监听为此目的在内部创建的独占队列上的回复消息。 类似的请求-回复方法也适用于 MessageConverter 应用于请求和回复的情况。 这些方法名为 convertSendAndReceive。 有关更多详细信息,请参阅 {spring-amqp-java-docs}/core/AmqpTemplate.html[AmqpTemplate 的 Javadoc]。 从版本 1.5.0 开始,每个 sendAndReceive 方法变体都有一个接受 CorrelationData 的重载版本。 结合正确配置的连接工厂,这使得操作发送方能够接收发布者确认。 有关更多信息,请参阅 关联的发布者确认和返回 和 {spring-amqp-java-docs}/rabbit/core/RabbitOperations.html[RabbitOperations 的 Javadoc]。 从版本 2.0 开始,这些方法(convertSendAndReceiveAsType)的变体接受一个额外的 ParameterizedTypeReference 参数,用于转换复杂的返回类型。 模板必须配置 SmartMessageConverter。 有关更多信息,请参阅 使用 RabbitTemplateMessage 转换。 从版本 2.1 开始,您可以配置 RabbitTemplatenoLocalReplyConsumer 选项,以控制回复消费者的 noLocal 标志。 此选项默认为 false

回复超时

默认情况下,发送和接收方法在五秒后超时并返回 null。 您可以通过设置 replyTimeout 属性来修改此行为。 从版本 1.5 开始,如果您将 mandatory 属性设置为 true(或者 mandatory-expression 对于特定消息评估为 true),如果消息无法传递到队列,则会抛出 AmqpMessageReturnedException。 此异常具有 returnedMessagereplyCodereplyText 属性,以及用于发送的 exchangeroutingKey

此功能使用发布者返回。 您可以通过在 CachingConnectionFactory 上将 publisherReturns 设置为 true 来启用它(请参阅 发布者确认和返回)。 此外,您不得在 RabbitTemplate 中注册自己的 ReturnCallback

从版本 2.1.2 开始,添加了一个 replyTimedOut 方法,允许子类收到超时通知,以便它们可以清理任何保留的状态。

从版本 2.0.11 和 2.1.3 开始,当您使用默认的 DirectReplyToMessageListenerContainer 时,可以通过设置模板的 replyErrorHandler 属性来添加错误处理器。 此错误处理器将针对任何失败的传递(例如延迟回复和未收到关联头部的消息)进行调用。 传入的异常是 ListenerExecutionFailedException,它具有 failedMessage 属性。

RabbitMQ 直接回复

从版本 3.4.0 开始,RabbitMQ 服务器支持 直接回复。 这消除了固定回复队列的主要原因(避免为每个请求创建临时队列的需要)。 从 Spring AMQP 1.4.1 版本开始,默认使用直接回复(如果服务器支持),而不是创建临时回复队列。 当未提供 replyQueue(或将其名称设置为 amq.rabbitmq.reply-to)时,RabbitTemplate 会自动检测是否支持直接回复,并选择使用它或回退到使用临时回复队列。 使用直接回复时,不需要 reply-listener,也不应进行配置。

回复监听器仍然支持命名队列(amq.rabbitmq.reply-to 除外),允许控制回复并发等。

从版本 1.6 开始,如果您希望为每个回复使用临时、独占、自动删除队列,请将 useTemporaryReplyQueues 属性设置为 true。 如果您设置了 replyAddress,则此属性将被忽略。

您可以通过继承 RabbitTemplate 并重写 useDirectReplyTo() 来更改决定是否使用直接回复的标准,以检查不同的标准。 该方法只在发送第一个请求时调用一次。

在 2.0 版本之前,RabbitTemplate 为每个请求创建一个新的消费者,并在收到回复(或超时)时取消该消费者。 现在,模板改为使用 DirectReplyToMessageListenerContainer,允许消费者被重用。 模板仍然负责关联回复,因此没有延迟回复发送给不同发送者的风险。 如果您想恢复到以前的行为,请将 useDirectReplyToContainer(使用 XML 配置时为 direct-reply-to-container)属性设置为 false。

AsyncRabbitTemplate 没有此选项。 当使用直接回复时,它总是使用 DirectReplyToContainer 进行回复。

从版本 2.3.7 开始,模板新增了一个属性 useChannelForCorrelation。 当此属性为 true 时,服务器无需将关联 ID 从请求消息头部复制到回复消息。 相反,用于发送请求的通道将用于将回复与请求关联起来。

使用回复队列进行消息关联

当使用固定回复队列(amq.rabbitmq.reply-to 除外)时,您必须提供关联数据,以便可以将回复与请求关联起来。 请参阅 RabbitMQ 远程过程调用 (RPC)。 默认情况下,标准 correlationId 属性用于保存关联数据。 但是,如果您希望使用自定义属性来保存关联数据,则可以在 <rabbit-template/> 上设置 correlation-key 属性。 将属性显式设置为 correlationId 与省略该属性相同。 客户端和服务器必须使用相同的头部进行关联数据。

Spring AMQP 1.1 版本使用名为 spring_reply_correlation 的自定义属性来存储此数据。 如果您希望在当前版本中恢复此行为(可能为了保持与使用 1.1 的其他应用程序的兼容性),则必须将属性设置为 spring_reply_correlation

默认情况下,模板会生成自己的关联 ID(忽略任何用户提供的值)。 如果您希望使用自己的关联 ID,请将 RabbitTemplate 实例的 userCorrelationId 属性设置为 true

关联 ID 必须是唯一的,以避免将错误的回复返回给请求的可能性。

回复监听器容器

当使用 RabbitMQ 3.4.0 之前的版本时,每个回复都会使用一个新的临时队列。 但是,可以在模板上配置一个单一的回复队列,这样效率更高,并且还可以让您在该队列上设置参数。 在这种情况下,您还必须提供一个 <reply-listener/> 子元素。 此元素为回复队列提供一个监听器容器,模板作为监听器。 <listener-container/> 元素上允许的所有 消息监听器容器配置 属性都允许在该元素上使用,但 connection-factorymessage-converter 除外,它们继承自模板的配置。

如果您运行应用程序的多个实例或使用多个 RabbitTemplate 实例,则 必须 为每个实例使用唯一的回复队列。 RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,每个实例将争夺回复,并且不一定会收到自己的回复。

以下示例定义了一个带有连接工厂的 Rabbit 模板:

<rabbit:template id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-queue="replies"
        reply-address="replyEx/routeReply">
    <rabbit:reply-listener/>
</rabbit:template>

虽然容器和模板共享一个连接工厂,但它们不共享一个通道。 因此,请求和回复不会在同一事务中执行(如果事务性)。

在版本 1.5.0 之前,reply-address 属性不可用。 回复总是通过使用默认交换和 reply-queue 名称作为路由键进行路由。 这仍然是默认设置,但您现在可以指定新的 reply-address 属性。 reply-address 可以包含 <exchange>/<routingKey> 形式的地址,并且回复将路由到指定的交换并路由到绑定了路由键的队列。 reply-address 优先于 reply-queue。 当只使用 reply-address 时,<reply-listener> 必须配置为单独的 <listener-container> 组件。 reply-addressreply-queue(或 <listener-container> 上的 queues 属性)在逻辑上必须引用同一个队列。

通过此配置,SimpleListenerContainer 用于接收回复,RabbitTemplate 作为 MessageListener。 当使用 <rabbit:template/> 命名空间元素定义模板时,如前面的示例所示,解析器定义容器并将模板作为监听器连接。

当模板不使用固定的 replyQueue(或正在使用直接回复——请参阅 RabbitMQ 直接回复)时,不需要监听器容器。 直接 reply-to 是使用 RabbitMQ 3.4.0 或更高版本时的首选机制。

如果您将 RabbitTemplate 定义为 <bean/> 或使用 @Configuration 类将其定义为 @Bean,或者当您以编程方式创建模板时,您需要自己定义并连接回复监听器容器。 如果您未能这样做,模板将永远不会收到回复,并最终超时并返回 null 作为 sendAndReceive 方法调用的回复。

从版本 1.5 开始,RabbitTemplate 会检测它是否已配置为 MessageListener 以接收回复。 如果未配置,则尝试发送和接收带回复地址的消息将失败并抛出 IllegalStateException(因为永远不会收到回复)。

此外,如果使用简单的 replyAddress(队列名称),回复监听器容器会验证它是否正在监听具有相同名称的队列。 如果回复地址是交换和路由键,则无法执行此检查,并且会写入调试日志消息。

当您自己连接回复监听器和模板时,务必确保模板的 replyAddress 和容器的 queues(或 queueNames)属性引用同一个队列。 模板将回复地址插入到出站消息的 replyTo 属性中。

以下列表显示了如何手动连接 bean 的示例:

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="connectionFactory" />
    <property name="exchange" value="foo.exchange" />
    <property name="routingKey" value="foo" />
    <property name="replyQueue" ref="replyQ" />
    <property name="replyTimeout" value="600000" />
    <property name="useDirectReplyToContainer" value="false" />
</bean>

<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <constructor-arg ref="connectionFactory" />
    <property name="queues" ref="replyQ" />
    <property name="messageListener" ref="amqpTemplate" />
</bean>

<rabbit:queue id="replyQ" name="my.reply.queue" />
    @Bean
    public RabbitTemplate amqpTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(msgConv());
        rabbitTemplate.setReplyAddress(replyQueue().getName());
        rabbitTemplate.setReplyTimeout(60000);
        rabbitTemplate.setUseDirectReplyToContainer(false);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer replyListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueues(replyQueue());
        container.setMessageListener(amqpTemplate());
        return container;
    }

    @Bean
    public Queue replyQueue() {
        return new Queue("my.reply.queue");
    }

一个完整的 RabbitTemplate 示例,它连接了一个固定回复队列,以及一个处理请求并返回回复的“远程”监听器容器,请参阅 此测试用例

当回复超时 (replyTimeout) 时,sendAndReceive() 方法返回 null。

在 1.3.6 版本之前,超时消息的延迟回复只会记录下来。 现在,如果收到延迟回复,它将被拒绝(模板抛出 AmqpRejectAndDontRequeueException)。 如果回复队列配置为将拒绝的消息发送到死信交换,则可以检索回复以供以后分析。 为此,将一个队列绑定到配置的死信交换,其路由键等于回复队列的名称。

有关配置死信的更多信息,请参阅 RabbitMQ 死信文档。 您还可以查看 FixedReplyQueueDeadLetterTests 测试用例以获取示例。

异步 Rabbit 模板

版本 1.6 引入了 AsyncRabbitTemplate。 它具有与 AmqpTemplate 上类似的 sendAndReceive(和 convertSendAndReceive)方法。 然而,它们不是阻塞,而是返回一个 CompletableFuture

sendAndReceive 方法返回 RabbitMessageFutureconvertSendAndReceive 方法返回 RabbitConverterFuture

您可以稍后通过调用 future 上的 get() 同步检索结果,也可以注册一个回调,该回调将异步地调用结果。 以下列表显示了两种方法:

@Autowired
private AsyncRabbitTemplate template;

...

public void doSomeWorkAndGetResultLater() {

    ...

    CompletableFuture<String> future = this.template.convertSendAndReceive("foo");

    // do some more work

    String reply = null;
    try {
        reply = future.get(10, TimeUnit.SECONDS);
    }
    catch (ExecutionException e) {
        ...
    }

    ...

}

public void doSomeWorkAndGetResultAsync() {

    ...

    RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            // success
        }
        else {
            // failure
        }
    });

    ...

}

如果设置了 mandatory 并且消息无法传递,Future 将抛出 ExecutionException,其原因是 AmqpMessageReturnedException,它封装了返回的消息和有关返回的信息。

如果设置了 enableConfirms,则 Future 具有一个名为 confirm 的属性,它本身是一个 CompletableFuture<Boolean>,其中 true 表示发布成功。 如果确认 Future 为 false,则 RabbitFuture 具有一个名为 nackCause 的附加属性,其中包含失败的原因(如果可用)。

如果发布者确认在回复之后收到,则会被丢弃,因为回复意味着成功发布。

您可以设置模板的 receiveTimeout 属性来使回复超时(默认为 30000 - 30 秒)。 如果发生超时,Future 将以 AmqpReplyTimeoutException 完成。

模板实现了 SmartLifecycle。 当存在待处理的回复时停止模板会导致待处理的 Future 实例被取消。

从版本 2.0 开始,异步模板现在支持 直接回复,而不是配置的回复队列。 要启用此功能,请使用以下构造函数之一:

public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)

public AsyncRabbitTemplate(RabbitTemplate template)

请参阅 RabbitMQ 直接回复,以将直接回复与同步 RabbitTemplate 一起使用。

版本 2.0 引入了这些方法的变体(convertSendAndReceiveAsType),它们接受一个额外的 ParameterizedTypeReference 参数,用于转换复杂的返回类型。 您必须使用 SmartMessageConverter 配置底层 RabbitTemplate。 有关更多信息,请参阅 使用 RabbitTemplateMessage 转换

带有 AMQP 的 Spring Remoting

Spring Remoting 不再受支持,因为该功能已从 Spring Framework 中移除。

请改用 RabbitTemplate(客户端)的 sendAndReceive 操作和 @RabbitListener