注解驱动的监听器端点

接收异步消息最简单的方法是使用注解驱动的监听器端点基础设施。 简而言之,它允许您将托管 bean 的方法公开为 Rabbit 监听器端点。 以下示例展示了如何使用 @RabbitListener 注解:

@Component
public class MyService {

    @RabbitListener(queues = "myQueue")
    public void processOrder(String data) {
        ...
    }

}

上述示例的目的是,每当 myQueue 队列中有消息可用时,processOrder 方法就会相应地被调用(在这种情况下,使用消息的有效负载)。

注解驱动的端点基础设施会为每个注解方法在后台创建一个消息监听器容器,它通过使用 RabbitListenerContainerFactory 来实现。

在前面的示例中,myQueue 必须已经存在并绑定到某个交换器。 只要应用程序上下文中存在 RabbitAdmin,队列就可以自动声明和绑定。

注解属性(queues 等)可以指定属性占位符(${some.property})或 SpEL 表达式(#{someExpression})。 有关为何使用 SpEL 而非属性占位符的示例,请参阅 监听多个队列。 以下列表显示了声明 Rabbit 监听器的三个示例:

@Component
public class MyService {

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "myQueue", durable = "true"),
        exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
        key = "orderRoutingKey")
  )
  public void processOrder(Order order) {
    ...
  }

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "auto.exch"),
        key = "invoiceRoutingKey")
  )
  public void processInvoice(Invoice invoice) {
    ...
  }

  @RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true"))
  public String handleWithSimpleDeclare(String data) {
      ...
  }

}

在第一个示例中,如果需要,队列 myQueue 会与交换器一起自动声明(持久化),并使用路由键绑定到交换器。 在第二个示例中,声明并绑定了一个匿名(独占、自动删除)队列;队列名称由框架使用 Base64UrlNamingStrategy 创建。 您不能使用此技术声明代理命名的队列;它们需要声明为 bean 定义;请参阅 容器和代理命名的队列。 可以提供多个 QueueBinding 条目,让监听器监听多个队列。 在第三个示例中,如果需要,会声明一个名称从属性 my.queue 获取的队列,并使用队列名称作为路由键将其默认绑定到默认交换器。

自 2.0 版本起,@Exchange 注解支持任何交换器类型,包括自定义类型。 欲了解更多信息,请参阅 AMQP 概念

当您需要更高级的配置时,可以使用普通的 @Bean 定义。

请注意第一个示例中交换器上的 ignoreDeclarationExceptions。 这允许,例如,绑定到一个可能具有不同设置(例如 internal)的现有交换器。 默认情况下,现有交换器的属性必须匹配。

从 2.0 版本开始,您现在可以将队列与多个路由键绑定到交换器,如下例所示:

...
    key = { "red", "yellow" }
...

您还可以在 @QueueBinding 注解中为队列、交换器和绑定指定参数,如下例所示:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "auto.headers", autoDelete = "true",
                        arguments = @Argument(name = "x-message-ttl", value = "10000",
                                                type = "java.lang.Integer")),
        exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
        arguments = {
                @Argument(name = "x-match", value = "all"),
                @Argument(name = "thing1", value = "somevalue"),
                @Argument(name = "thing2")
        })
)
public String handleWithHeadersExchange(String foo) {
    ...
}

请注意,队列的 x-message-ttl 参数设置为 10 秒。 由于参数类型不是 String,我们必须指定其类型——在本例中为 Integer。 与所有此类声明一样,如果队列已经存在,则参数必须与队列上的参数匹配。 对于头部交换器,我们将绑定参数设置为匹配具有 thing1 头部且设置为 somevalue 的消息,并且 thing2 头部必须存在且具有任何值。 x-match 参数表示两个条件都必须满足。

参数名称、值和类型可以是属性占位符(${…​})或 SpEL 表达式(#{…​})。 name 必须解析为 Stringtype 的表达式必须解析为 Class 或类的完全限定名称。 value 必须解析为可以通过 DefaultConversionService 转换为该类型的值(例如前面示例中的 x-message-ttl)。

如果名称解析为 null 或空 String,则该 @Argument 将被忽略。