线程和异步消费者

异步消费者涉及许多不同的线程。

RabbitMQ 客户端 传递新消息时,SimpleMessageListenerContainer 中配置的 TaskExecutor 的线程用于调用 MessageListener。 如果未配置,则使用 SimpleAsyncTaskExecutor。 如果使用线程池执行器,则需要确保线程池大小足以处理配置的并发量。 使用 DirectMessageListenerContainer 时,MessageListener 直接在 RabbitMQ 客户端 线程上调用。 在这种情况下,taskExecutor 用于监视消费者的任务。

当使用默认的 SimpleAsyncTaskExecutor 时,对于调用侦听器的线程,侦听器容器的 beanName 用作 threadNamePrefix。 这对于日志分析很有用。 我们通常建议始终在日志记录附加器配置中包含线程名称。 当通过容器上的 taskExecutor 属性明确提供 TaskExecutor 时,它会按原样使用,不进行修改。 建议您使用类似的技术来命名由自定义 TaskExecutor bean 定义创建的线程,以帮助在日志消息中识别线程。

CachingConnectionFactory 中配置的 Executor 在创建连接时传递给 RabbitMQ 客户端,其线程用于将新消息传递到侦听器容器。 如果未配置,客户端将使用内部线程池执行器,每个连接的线程池大小(撰写本文时)为 Runtime.getRuntime().availableProcessors() * 2

如果您有大量工厂或正在使用 CacheMode.CONNECTION,您可能希望考虑使用共享的 ThreadPoolTaskExecutor,并提供足够的线程来满足您的工作负载。

使用 DirectMessageListenerContainer 时,您需要确保连接工厂配置了一个任务执行器,该执行器具有足够的线程来支持所有使用该工厂的侦听器容器所需的并发量。 默认线程池大小(撰写本文时)为 Runtime.getRuntime().availableProcessors() * 2

RabbitMQ 客户端 使用 ThreadFactory 为低级 I/O(套接字)操作创建线程。 要修改此工厂,您需要配置底层的 RabbitMQ ConnectionFactory,如 配置底层客户端连接工厂 中所述。