启用监听器端点注解

要启用对 @RabbitListener 注解的支持,您可以在您的一个 @Configuration 类中添加 @EnableRabbit。 以下示例展示了如何实现:

@Configuration
@EnableRabbit
public class AppConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setContainerCustomizer(container -> /* customize the container */);
        return factory;
    }
}

自 2.0 版本起,DirectMessageListenerContainerFactory 也可用。 它创建 DirectMessageListenerContainer 实例。

有关 SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory 之间选择的信息,请参阅 选择容器

从 2.2.2 版本开始,您可以提供一个 ContainerCustomizer 实现(如上所示)。 这可用于在容器创建和配置后进一步配置容器;例如,您可以使用它来设置容器工厂未公开的属性。

2.4.8 版本提供了 CompositeContainerCustomizer,适用于您希望应用多个定制器的情况。

默认情况下,基础设施会查找名为 rabbitListenerContainerFactory 的 bean,作为创建消息监听器容器的工厂来源。 在这种情况下,如果忽略 RabbitMQ 基础设施设置,processOrder 方法可以以三个线程的核心池大小和十个线程的最大池大小被调用。

您可以为每个注解定制要使用的监听器容器工厂,或者通过实现 RabbitListenerConfigurer 接口来配置一个显式默认值。 仅当至少有一个端点未注册特定容器工厂时才需要默认值。 有关完整的详细信息和示例,请参阅 {spring-amqp-java-docs}/rabbit/annotation/RabbitListenerConfigurer.html[Javadoc]。

容器工厂提供了添加 MessagePostProcessor 实例的方法,这些实例在接收消息后(调用监听器之前)和发送回复之前应用。

有关回复的信息,请参阅 回复管理

从 2.0.6 版本开始,您可以向监听器容器工厂添加 RetryTemplateRecoveryCallback。 它在发送回复时使用。 当重试耗尽时,会调用 RecoveryCallback。 您可以使用 SendRetryContextAccessor 从上下文中获取信息。 以下示例展示了如何实现:

factory.setRetryTemplate(retryTemplate);
factory.setReplyRecoveryCallback(ctx -> {
    Message failed = SendRetryContextAccessor.getMessage(ctx);
    Address replyTo = SendRetryContextAccessor.getAddress(ctx);
    Throwable t = ctx.getLastThrowable();
    ...
    return null;
});

如果您更喜欢 XML 配置,可以使用 <rabbit:annotation-driven> 元素。 任何使用 @RabbitListener 注解的 bean 都会被检测到。

对于 SimpleRabbitListenerContainer 实例,您可以使用类似于以下的 XML:

<rabbit:annotation-driven/>

<bean id="rabbitListenerContainerFactory"
      class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="concurrentConsumers" value="3"/>
    <property name="maxConcurrentConsumers" value="10"/>
</bean>

对于 DirectMessageListenerContainer 实例,您可以使用类似于以下的 XML:

<rabbit:annotation-driven/>

<bean id="rabbitListenerContainerFactory"
      class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="consumersPerQueue" value="3"/>
</bean>

从 2.0 版本开始,@RabbitListener 注解具有 concurrency 属性。 它支持 SpEL 表达式 (#{…​}) 和属性占位符 (${…​})。 其含义和允许的值取决于容器类型,如下所示:

  • 对于 DirectMessageListenerContainer,该值必须是一个整数值,它设置容器上的 consumersPerQueue 属性。

  • 对于 SimpleRabbitListenerContainer,该值可以是一个整数值,它设置容器上的 concurrentConsumers 属性,或者它可以具有 m-n 的形式,其中 mconcurrentConsumers 属性,nmaxConcurrentConsumers 属性。

无论哪种情况,此设置都将覆盖工厂上的设置。 以前,如果监听器需要不同的并发性,您必须定义不同的容器工厂。

该注解还允许通过 autoStartupexecutor(自 2.2 起)注解属性覆盖工厂的 autoStartuptaskExecutor 属性。 为每个监听器使用不同的执行器可能有助于在日志和线程转储中识别与每个监听器关联的线程。

2.2 版本还添加了 ackMode 属性,允许您覆盖容器工厂的 acknowledgeMode 属性。

@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public void manual1(String in, Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {

    ...
    channel.basicAck(tag, false);
}