异步消费者
Spring AMQP 还支持通过使用 @RabbitListener
注解来处理带注解的监听器端点,并提供了开放的基础设施以编程方式注册端点。
这是设置异步消费者最方便的方法。
有关更多详细信息,请参阅 注解驱动的监听器端点。
prefetch 的默认值曾经是 1,这可能导致高效的消费者未能得到充分利用。 从 2.0 版本开始,默认的 prefetch 值现在是 250,这在大多数常见场景中应该能让消费者保持忙碌, 从而提高吞吐量。 然而,在某些情况下,prefetch 值应该设置得较低:
-
对于大消息,特别是处理速度较慢的情况下(消息在客户端进程中可能会占用大量内存)
-
当严格的消息排序是必需的(在这种情况下,prefetch 值应该重新设置为 1)
-
其他特殊情况
此外,在消息量较低且消费者较多(包括单个监听器容器实例内的并发)的情况下,您可能希望降低 prefetch 值,以使消息在消费者之间分布得更均匀。 请参阅 消息监听器容器配置。 有关 prefetch 的更多背景信息,请参阅这篇关于 RabbitMQ 中消费者利用率的帖子 以及这篇关于 排队理论的帖子。
消息监听器
对于异步 Message
接收,涉及到一个专门的组件(而不是 AmqpTemplate
)。
该组件是 Message
消费回调的容器。
我们将在本节后面讨论容器及其属性。
但是,首先,我们应该看看回调,因为那是您的应用程序代码与消息系统集成的地方。
回调有几个选项,从 MessageListener
接口的实现开始,以下列表显示了该接口:
public interface MessageListener {
void onMessage(Message message);
}
如果您的回调逻辑出于任何原因依赖于 AMQP Channel 实例,您可以转而使用 ChannelAwareMessageListener
。
它看起来相似,但有一个额外的参数。
以下列表显示了 ChannelAwareMessageListener
接口定义:
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
在 2.1 版本中,此接口从 o.s.amqp.rabbit.core
包移动到 o.s.amqp.rabbit.listener.api
。
MessageListenerAdapter
如果您希望在应用程序逻辑和消息 API 之间保持更严格的分离,您可以依赖框架提供的适配器实现。
这通常被称为“消息驱动 POJO
”支持。
1.5 版本引入了一种更灵活的 POJO 消息传递机制,即 |
使用适配器时,您只需提供适配器本身应调用的实例的引用。 以下示例显示了如何执行此操作:
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
您可以子类化适配器并提供 getListenerMethodName()
的实现,以根据消息动态选择不同的方法。
此方法有两个参数,originalMessage
和 extractedMessage
,后者是任何转换的结果。
默认情况下,配置了 SimpleMessageConverter
。
有关更多信息以及有关其他可用转换器的信息,请参阅 SimpleMessageConverter
。
从 1.4.2 版本开始,原始消息具有 consumerQueue
和 consumerTag
属性,可用于确定消息是从哪个队列接收的。
从 1.5 版本开始,您可以配置一个消费者队列或标签到方法名称的映射,以动态选择要调用的方法。
如果映射中没有条目,我们将回退到默认的监听器方法。
默认的监听器方法(如果未设置)是 handleMessage
。
从 2.0 版本开始,提供了一个方便的 FunctionalInterface
。
以下列表显示了 FunctionalInterface
的定义:
@FunctionalInterface
public interface ReplyingMessageListener<T, R> {
R handleMessage(T t);
}
此接口通过使用 Java 8 lambda 简化了适配器的方便配置,如下例所示:
new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
...
return result;
}));
从 2.2 版本开始,buildListenerArguments(Object)
已被弃用,并引入了新的 buildListenerArguments(Object, Channel, Message)
。
新方法有助于监听器获取 Channel
和 Message
参数,以执行更多操作,例如在手动确认模式下调用 channel.basicReject(long, boolean)
。
以下列表显示了最基本的示例:
public class ExtendedListenerAdapter extends MessageListenerAdapter {
@Override
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
return new Object[]{extractedMessage, channel, message};
}
}
现在,如果您需要接收“channel
”和“message
”,您可以像配置 MessageListenerAdapter
一样配置 ExtendedListenerAdapter
。
监听器的参数应设置为 buildListenerArguments(Object, Channel, Message)
的返回值,如下面的监听器示例所示:
public void handleMessage(Object object, Channel channel, Message message) throws IOException {
...
}
容器
现在您已经了解了 Message
监听回调的各种选项,我们可以将注意力转向容器。
基本上,容器处理“活动
”职责,以便监听器回调保持被动。
容器是“生命周期
”组件的一个示例。
它提供了启动和停止的方法。
配置容器时,您实质上弥合了 AMQP 队列和 MessageListener
实例之间的差距。
您必须提供 ConnectionFactory
的引用以及监听器应从中消费消息的队列名称或队列实例。
在 2.0 版本之前,只有一个监听器容器,即 SimpleMessageListenerContainer
。
现在有了第二个容器,即 DirectMessageListenerContainer
。
容器之间的差异以及您在选择使用哪个容器时可能应用的条件在 选择容器中进行了描述。
以下列表显示了最基本的示例,它通过使用 SimpleMessageListenerContainer
来工作:
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));
作为“活动
”组件,最常见的是使用 bean 定义创建监听器容器,以便它可以在后台运行。
以下示例显示了使用 XML 执行此操作的一种方法:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
以下列表显示了使用 XML 执行此操作的另一种方法:
<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
前面的两个示例都创建了一个 DirectMessageListenerContainer
(注意 type
属性——它默认为 simple
)。
或者,您可能更喜欢使用 Java 配置,它看起来与前面的代码片段相似:
@Configuration
public class ExampleAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public MessageListener exampleListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
};
}
}
消费者优先级
从 RabbitMQ 3.2 版本开始,代理现在支持消费者优先级(请参阅 使用 RabbitMQ 的消费者优先级)。
这通过在消费者上设置 x-priority
参数来启用。
SimpleMessageListenerContainer
现在支持设置消费者参数,如下例所示:
container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));
为方便起见,命名空间在 listener
元素上提供了 priority
属性,如下例所示:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>
从 1.3 版本开始,您可以在运行时修改容器监听的队列。 请参阅 监听器容器队列。
auto-delete
队列
当容器配置为监听 auto-delete
队列时,队列具有 x-expires
选项,或者在代理上配置了 Time-To-Live 策略,当容器停止时(即当最后一个消费者被取消时),队列将被代理删除。
在 1.3 版本之前,由于队列丢失,容器无法重新启动。
RabbitAdmin
只在连接关闭或打开时自动重新声明队列等,这在容器停止和启动时不会发生。
从 1.3 版本开始,容器在启动时使用 RabbitAdmin
重新声明任何丢失的队列。
您还可以使用条件声明(请参阅 条件声明)以及 auto-startup="false"
管理员来推迟队列声明,直到容器启动。
以下示例显示了如何执行此操作:
<rabbit:queue id="otherAnon" declared-by="containerAdmin" />
<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>
<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
auto-startup="false" />
在这种情况下,队列和交换由 containerAdmin
声明,containerAdmin
的 auto-startup="false"
,因此在上下文初始化期间不会声明这些元素。
同样,容器也不会因相同的原因而启动。
当容器稍后启动时,它会使用其对 containerAdmin
的引用来声明这些元素。