监听器并发

SimpleMessageListenerContainer

默认情况下,监听器容器启动单个消费者来接收来自队列的消息。

在上一节的表格中,您可以看到许多控制并发的属性。最简单的是 concurrentConsumers,它创建了(固定)数量的消费者,这些消费者并发处理消息。

在 1.3.0 版本之前,这是唯一可用的设置,并且必须停止并再次启动容器才能更改设置。

自 1.3.0 版本以来,您现在可以动态调整 concurrentConsumers 属性。如果在容器运行时更改它,将根据需要添加或删除消费者以适应新设置。

此外,还添加了一个名为 maxConcurrentConsumers 的新属性,容器根据工作负载动态调整并发。这与另外四个属性一起工作:consecutiveActiveTriggerstartConsumerMinIntervalconsecutiveIdleTriggerstopConsumerMinInterval。使用默认设置,增加消费者的算法如下:

如果 maxConcurrentConsumers 尚未达到,并且现有消费者连续十个周期处于活动状态,并且自上次启动消费者以来至少已过去 10 秒,则会启动一个新消费者。如果消费者在 batchSize * receiveTimeout 毫秒内至少收到一条消息,则认为它是活动的。

使用默认设置,减少消费者的算法如下:

如果有超过 concurrentConsumers 正在运行,并且消费者检测到连续十次超时(空闲),并且上次停止消费者至少在 60 秒前,则停止一个消费者。超时取决于 receiveTimeoutbatchSize 属性。如果消费者在 batchSize * receiveTimeout 毫秒内没有收到消息,则认为它是空闲的。因此,使用默认超时(一秒)和 batchSize 为四,在 40 秒的空闲时间后(四个超时对应一次空闲检测)考虑停止消费者。

实际上,只有当整个容器空闲一段时间后,消费者才能被停止。这是因为代理在其所有活动的消费者之间共享其工作。

每个消费者使用一个单独的通道,无论配置的队列数量如何。

从 2.0 版本开始,concurrentConsumersmaxConcurrentConsumers 属性可以通过 concurrency 属性设置——例如,2-4

使用 DirectMessageListenerContainer

使用此容器,并发基于配置的队列和 consumersPerQueue。每个队列的每个消费者使用一个单独的通道,并发由 rabbit 客户端库控制。默认情况下,在撰写本文时,它使用 DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2 个线程池。

您可以配置 taskExecutor 来提供所需的最大并发。