连接与资源管理

虽然我们在上一节中描述的 AMQP 模型是通用的,适用于所有实现,但当我们进入资源管理时,具体细节则取决于代理的实现。 因此,在本节中,我们只关注 “spring-rabbit” 模块中存在的代码,因为目前 RabbitMQ 是唯一受支持的实现。 用于管理与 RabbitMQ 代理连接的核心组件是 ConnectionFactory 接口。 ConnectionFactory 实现的职责是提供 org.springframework.amqp.rabbit.connection.Connection 的实例,它是 com.rabbitmq.client.Connection 的包装器。

选择连接工厂

有三种连接工厂可供选择:

  • PooledChannelConnectionFactory

  • ThreadChannelConnectionFactory

  • CachingConnectionFactory

前两个是在 2.3 版本中添加的。

对于大多数用例,应使用 CachingConnectionFactory。 如果希望确保严格的消息顺序,而无需使用 作用域操作,则可以使用 ThreadChannelConnectionFactoryPooledChannelConnectionFactory 类似于 CachingConnectionFactory,因为它使用单个连接和通道池。 它的实现更简单,但不支持关联的发布者确认。

所有这三种工厂都支持简单的发布者确认。

从 2.3.2 版本开始,当配置 RabbitTemplate 使用 单独连接 时,现在可以将发布连接工厂配置为不同类型。 默认情况下,发布工厂与主工厂类型相同,并且主工厂上设置的任何属性也会传播到发布工厂。

从 3.1 版本开始,AbstractConnectionFactory 包含 connectionCreatingBackOff 属性,该属性支持连接模块中的退避策略。 目前,createChannel() 的行为支持处理达到 channelMax 限制时发生的异常,实现基于尝试和间隔的退避策略。

PooledChannelConnectionFactory

此工厂管理单个连接和两个通道池,基于 Apache Pool2。 一个池用于事务性通道,另一个用于非事务性通道。 这些池是 GenericObjectPool,具有默认配置;提供了回调以配置这些池;有关更多信息,请参阅 Apache 文档。

要使用此工厂,Apache commons-pool2 jar 必须位于类路径中。

@Bean
PooledChannelConnectionFactory pcf() throws Exception {
    ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
    rabbitConnectionFactory.setHost("localhost");
    PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
    pcf.setPoolConfigurer((pool, tx) -> {
        if (tx) {
            // configure the transactional pool
        }
        else {
            // configure the non-transactional pool
        }
    });
    return pcf;
}

ThreadChannelConnectionFactory

此工厂管理单个连接和两个 ThreadLocal,一个用于事务性通道,另一个用于非事务性通道。 此工厂确保同一线程上的所有操作都使用同一通道(只要它保持打开状态)。 这有助于实现严格的消息顺序,而无需 作用域操作。 为避免内存泄漏,如果您的应用程序使用许多短生命周期线程,则必须调用工厂的 closeThreadChannel() 来释放通道资源。 从 2.3.7 版本开始,线程可以将其通道传输到另一个线程。 有关更多信息,请参阅 多线程环境中的严格消息排序

CachingConnectionFactory

提供的第三个实现是 CachingConnectionFactory,它默认建立一个可以由应用程序共享的单个连接代理。 连接共享之所以可能,是因为 AMQP 消息传递的 “工作单元” 实际上是 “通道”(在某些方面,这类似于 JMS 中连接与会话之间的关系)。 连接实例提供 createChannel 方法。 CachingConnectionFactory 实现支持这些通道的缓存,并且它根据通道是否是事务性的来维护单独的通道缓存。 创建 CachingConnectionFactory 实例时,可以通过构造函数提供“主机名”。 还应提供“用户名”和“密码”属性。 要配置通道缓存的大小(默认值为 25),可以调用 setChannelCacheSize() 方法。

从 1.3 版本开始,您可以配置 CachingConnectionFactory 来缓存连接以及仅缓存通道。 在这种情况下,每次调用 createConnection() 都会创建一个新连接(或从缓存中检索一个空闲连接)。 关闭连接会将其返回到缓存(如果未达到缓存大小)。 在此类连接上创建的通道也会被缓存。 在某些环境中,使用单独的连接可能很有用,例如从 HA 集群消费,结合负载均衡器,连接到不同的集群成员等等。 要缓存连接,请将 cacheMode 设置为 CacheMode.CONNECTION

这不限制连接数。 相反,它指定允许有多少个空闲的打开连接。

从 1.5.5 版本开始,提供了一个名为 connectionLimit 的新属性。 设置此属性后,它会限制允许的总连接数。 设置后,如果达到限制,channelCheckoutTimeLimit 将用于等待连接变为空闲。 如果超出时间,则抛出 AmqpTimeoutException

当缓存模式为 CONNECTION 时,不支持队列等的自动声明 (请参阅 自动声明交换、队列和绑定)。 此外,在撰写本文时,amqp-client 库默认会为每个连接创建一个固定线程池(默认大小:Runtime.getRuntime().availableProcessors() * 2 个线程)。 当使用大量连接时,应考虑在 CachingConnectionFactory 上设置自定义 executor。 然后,所有连接都可以使用相同的执行器,并且其线程可以共享。 执行器的线程池应该是无界的,或者根据预期用途进行适当设置(通常,每个连接至少一个线程)。 如果每个连接上创建多个通道,则池大小会影响并发性,因此可变(或简单的缓存)线程池执行器将是最合适的。

重要的是要了解缓存大小(默认情况下)不是限制,而仅仅是可缓存的通道数。 假设缓存大小为 10,则实际上可以使用任意数量的通道。 如果使用了超过 10 个通道,并且它们都返回到缓存中,则 10 个进入缓存。 其余的物理关闭。

从 1.6 版本开始,默认通道缓存大小已从 1 增加到 25。 在高并发、多线程环境中,小缓存意味着通道以高速率创建和关闭。 增加默认缓存大小可以避免这种开销。 您应该通过 RabbitMQ 管理 UI 监控正在使用的通道,如果看到许多通道正在创建和关闭,则考虑进一步增加缓存大小。 缓存仅按需增长(以适应应用程序的并发要求),因此此更改不会影响现有的低并发应用程序。

从 1.4.2 版本开始,CachingConnectionFactory 有一个名为 channelCheckoutTimeout 的属性。 当此属性大于零时,channelCacheSize 成为单个连接上可创建的通道数的限制。 如果达到限制,则调用线程会阻塞,直到通道可用或达到此超时,在这种情况下会抛出 AmqpTimeoutException

框架内部使用的通道(例如,RabbitTemplate)会可靠地返回到缓存。 如果您在框架外部创建通道(例如,通过直接访问连接并调用 createChannel()),则必须可靠地(通过关闭)返回它们,可能在 finally 块中,以避免耗尽通道。

以下示例显示如何创建新 connection

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

使用 XML 时,配置可能如下所示:

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
</bean>

还有一个 SingleConnectionFactory 实现,仅在框架的单元测试代码中可用。 它比 CachingConnectionFactory 更简单,因为它不缓存通道,但由于其性能和弹性不足,不适用于简单测试之外的实际使用。 如果由于某种原因需要实现自己的 ConnectionFactoryAbstractConnectionFactory 基类可能会提供一个很好的起点。

使用 rabbit 命名空间可以快速方便地创建 ConnectionFactory,如下所示:

<rabbit:connection-factory id="connectionFactory"/>

在大多数情况下,这种方法更可取,因为框架可以为您选择最佳默认值。 创建的实例是 CachingConnectionFactory。 请记住,通道的默认缓存大小为 25。 如果希望缓存更多通道,请通过设置 'channelCacheSize' 属性来设置更大的值。 在 XML 中,它将如下所示:

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="channelCacheSize" value="50"/>
</bean>

此外,使用命名空间,您可以添加 'channel-cache-size' 属性,如下所示:

<rabbit:connection-factory
    id="connectionFactory" channel-cache-size="50"/>

默认缓存模式是 CHANNEL,但您可以将其配置为缓存连接。 在以下示例中,我们使用 connection-cache-size

<rabbit:connection-factory
    id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>

您可以使用命名空间提供主机和端口属性,如下所示:

<rabbit:connection-factory
    id="connectionFactory" host="somehost" port="5672"/>

或者,如果在集群环境中运行,可以使用 addresses 属性,如下所示:

<rabbit:connection-factory
    id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>

有关 address-shuffle-mode 的信息,请参阅 连接到集群

以下示例使用自定义线程工厂,该工厂将线程名称前缀为 rabbitmq-

<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
    thread-factory="tf"
    channel-cache-size="10" username="user" password="password" />

<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
    <constructor-arg value="rabbitmq-" />
</bean>

地址解析器

从 2.1.15 版本开始,您现在可以使用 AddressResolver 来解析连接地址。 这将覆盖 addresseshost/port 属性的任何设置。

命名连接

从 1.7 版本开始,提供了 ConnectionNameStrategy,用于注入到 AbstractionConnectionFactory 中。 生成的名称用于目标 RabbitMQ 连接的应用程序特定标识。 如果 RabbitMQ 服务器支持,连接名称将显示在管理 UI 中。 此值不必是唯一的,不能用作连接标识符——例如,在 HTTP API 请求中。 此值应具有人类可读性,并且是 ClientPropertiesconnection_name 键的一部分。 您可以使用简单的 Lambda,如下所示:

connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");

ConnectionFactory 参数可用于通过某些逻辑区分目标连接名称。 默认情况下,AbstractConnectionFactorybeanName、表示对象的十六进制字符串和内部计数器用于生成 connection_name<rabbit:connection-factory> 命名空间组件也提供了 connection-name-strategy 属性。

SimplePropertyValueConnectionNameStrategy 的实现将连接名称设置为应用程序属性。 您可以将其声明为 @Bean 并将其注入到连接工厂中,如下例所示:

@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
    return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}

@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    ...
    connectionFactory.setConnectionNameStrategy(cns);
    return connectionFactory;
}

该属性必须存在于应用程序上下文的 Environment 中。

使用 Spring Boot 及其自动配置的连接工厂时,您只需声明 ConnectionNameStrategy @Bean 即可。 Boot 会自动检测 bean 并将其连接到工厂。

阻塞连接和资源限制

连接可能因代理的交互而被阻塞,这对应于 内存警报。 从 2.0 版本开始,org.springframework.amqp.rabbit.connection.Connection 可以提供 com.rabbitmq.client.BlockedListener 实例,以接收连接阻塞和解除阻塞事件的通知。 此外,AbstractConnectionFactory 通过其内部 BlockedListener 实现分别发出 ConnectionBlockedEventConnectionUnblockedEvent。 这些允许您提供应用程序逻辑,以适当地响应代理上的问题并(例如)采取一些纠正措施。

当应用程序配置了单个 CachingConnectionFactory(如 Spring Boot 自动配置默认情况)时,当连接被 Broker 阻塞时,应用程序将停止工作。 当它被 Broker 阻塞时,其任何客户端都将停止工作。 如果我们在同一个应用程序中同时有生产者和消费者,当生产者阻塞连接(因为 Broker 上没有更多资源)而消费者无法释放它们(因为连接被阻塞)时,我们可能会陷入死锁。 为了缓解这个问题,我们建议再拥有一个单独的 CachingConnectionFactory 实例,具有相同的选项——一个用于生产者,一个用于消费者。 对于在消费者线程上执行的事务性生产者来说,单独的 CachingConnectionFactory 是不可能的,因为它们应该重用与消费者事务关联的 Channel

从 2.0.2 版本开始,RabbitTemplate 有一个配置选项,可以自动使用第二个连接工厂,除非正在使用事务。 有关更多信息,请参阅 使用单独连接。 发布者连接的 ConnectionNameStrategy 与主策略相同,只是在调用方法的结果后附加了 .publisher

从 1.7.7 版本开始,提供了 AmqpResourceNotAvailableException,当 SimpleConnection.createChannel() 无法创建 Channel(例如,因为达到 channelMax 限制且缓存中没有可用通道)时,将抛出此异常。 您可以在 RetryPolicy 中使用此异常,以便在退避后恢复操作。

配置底层客户端连接工厂

CachingConnectionFactory 使用 Rabbit 客户端 ConnectionFactory 的实例。 设置 CachingConnectionFactory 上等效属性时,许多配置属性会传递(例如 hostportuserNamepasswordrequestedHeartBeatconnectionTimeout)。 要设置其他属性(例如 clientProperties),您可以定义 Rabbit 工厂的实例,并通过使用 CachingConnectionFactory 的适当构造函数提供对其的引用。 使用命名空间时(如 前面所述),您需要在 connection-factory 属性中提供对已配置工厂的引用。 为方便起见,提供了一个工厂 bean,以帮助在 Spring 应用程序上下文中配置连接工厂,如 下一节所述。

<rabbit:connection-factory
      id="connectionFactory" connection-factory="rabbitConnectionFactory"/>

4.0.x 客户端默认启用自动恢复。 虽然与此功能兼容,但 Spring AMQP 有自己的恢复机制,通常不需要客户端恢复功能。 我们建议禁用 amqp-client 自动恢复,以避免在代理可用但连接尚未恢复时出现 AutoRecoverConnectionNotCurrentlyOpenException 实例。 例如,当 RabbitTemplate 中配置了 RetryTemplate 时,即使故障转移到集群中的另一个代理,您也可能会注意到此异常。 由于自动恢复连接在计时器上恢复,因此使用 Spring AMQP 的恢复机制可能会更快地恢复连接。 从 1.7.1 版本开始,Spring AMQP 默认禁用 amqp-client 自动恢复,除非您明确创建自己的 RabbitMQ 连接工厂并将其提供给 CachingConnectionFactoryRabbitConnectionFactoryBean 创建的 RabbitMQ ConnectionFactory 实例也默认禁用该选项。

RabbitConnectionFactoryBean 和配置 SSL

从 1.4 版本开始,提供了一个方便的 RabbitConnectionFactoryBean,通过使用依赖注入,可以方便地配置底层客户端连接工厂的 SSL 属性。 其他 setter 委托给底层工厂。 以前,您必须以编程方式配置 SSL 选项。 以下示例显示如何配置 RabbitConnectionFactoryBean

Java
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
    RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
    factoryBean.setUseSSL(true);
    factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
    return factoryBean;
}

@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
    CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
    ccf.setHost("...");
    // ...
    return ccf;
}
Boot application.properties
spring.rabbitmq.ssl.enabled:true
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.host=...
...
XML
<rabbit:connection-factory id="rabbitConnectionFactory"
    connection-factory="clientConnectionFactory"
    host="${host}"
    port="${port}"
    virtual-host="${vhost}"
    username="${username}" password="${password}" />

<bean id="clientConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
    <property name="useSSL" value="true" />
    <property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>

有关配置 SSL 的信息,请参阅 RabbitMQ 文档。 省略 keyStoretrustStore 配置以在没有证书验证的情况下通过 SSL 连接。 下一个示例显示如何提供密钥和信任库配置。

sslPropertiesLocation 属性是 Spring Resource,指向一个包含以下键的属性文件:

keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret

keyStoretruststore 是指向存储的 Spring Resources。 通常,此属性文件由操作系统保护,应用程序具有读取权限。

从 Spring AMQP 1.5 版本开始,您可以直接在工厂 bean 上设置这些属性。 如果同时提供了离散属性和 sslPropertiesLocation,则后者中的属性将覆盖离散值。

从 2.0 版本开始,默认情况下服务器证书会进行验证,因为它更安全。 如果出于某种原因希望跳过此验证,请将工厂 bean 的 skipServerCertificateValidation 属性设置为 true。 从 2.1 版本开始,RabbitConnectionFactoryBean 现在默认调用 enableHostnameVerification()。 要恢复到以前的行为,请将 enableHostnameVerification 属性设置为 false

从 2.2.5 版本开始,工厂 bean 将默认始终使用 TLS v1.2;以前,它在某些情况下使用 v1.1,在其他情况下使用 v1.2(取决于其他属性)。 如果由于某种原因需要使用 v1.1,请设置 sslAlgorithm 属性:setSslAlgorithm("TLSv1.1")

连接到集群

要连接到集群,请在 CachingConnectionFactory 上配置 addresses 属性:

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    return ccf;
}

从 3.0 版本开始,底层连接工厂将在建立新连接时尝试通过选择随机地址来连接到主机。 要恢复到从第一个到最后一个尝试连接的先前行为,请将 addressShuffleMode 属性设置为 AddressShuffleMode.NONE

从 2.3 版本开始,添加了 INORDER 混洗模式,这意味着在创建连接后,第一个地址会移动到末尾。 如果您希望在所有节点上从所有分片消费,您可能希望将此模式与 {rabbitmq-server-github}/rabbitmq_sharding[RabbitMQ 分片插件]、CacheMode.CONNECTION 和适当的并发性一起使用。

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
    return ccf;
}

路由连接工厂

从 1.3 版本开始,引入了 AbstractRoutingConnectionFactory。 此工厂提供了一种机制,用于配置多个 ConnectionFactories 的映射,并在运行时通过某个 lookupKey 确定目标 ConnectionFactory。 通常,实现会检查线程绑定的上下文。 为方便起见,Spring AMQP 提供了 SimpleRoutingConnectionFactory,它从 SimpleResourceHolder 获取当前线程绑定的 lookupKey。 以下示例显示如何在 XML 和 Java 中配置 SimpleRoutingConnectionFactory

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
    <property name="targetConnectionFactories">
        <map>
            <entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
            <entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
        </map>
    </property>
</bean>

<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void service(String vHost, String payload) {
        SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
        rabbitTemplate.convertAndSend(payload);
        SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
    }

}

使用后解除绑定资源非常重要。 有关更多信息,请参阅 {spring-amqp-java-docs}/rabbit/connection/AbstractRoutingConnectionFactory.html[JavaDoc] 中关于 AbstractRoutingConnectionFactory 的内容。

从 1.4 版本开始,RabbitTemplate 支持 SpEL sendConnectionFactorySelectorExpressionreceiveConnectionFactorySelectorExpression 属性,这些属性在每次 AMQP 协议交互操作(sendsendAndReceivereceivereceiveAndReply)时进行评估,解析为提供给 AbstractRoutingConnectionFactorylookupKey 值。 您可以在表达式中使用 bean 引用,例如 @vHostResolver.getVHost(#root)。 对于 send 操作,要发送的消息是根评估对象。 对于 receive 操作,queueName 是根评估对象。

路由算法如下:如果选择器表达式为 null 或评估为 null,或者提供的 ConnectionFactory 不是 AbstractRoutingConnectionFactory 的实例,则一切照旧,依赖于提供的 ConnectionFactory 实现。 如果评估结果不为 null,但该 lookupKey 没有目标 ConnectionFactory,并且 AbstractRoutingConnectionFactory 配置了 lenientFallback = true,也会发生同样的情况。 在 AbstractRoutingConnectionFactory 的情况下,它会根据 determineCurrentLookupKey() 回退到其 routing 实现。 但是,如果 lenientFallback = false,则会抛出 IllegalStateException

命名空间支持还为 <rabbit:template> 组件提供了 send-connection-factory-selector-expressionreceive-connection-factory-selector-expression 属性。

此外,从 1.4 版本开始,您可以在监听器容器中配置路由连接工厂。 在这种情况下,队列名称列表用作查找键。 例如,如果将容器配置为侦听 setQueueNames("thing1", "thing2"),则查找键为 [thing1,thing]"(请注意键中没有空格)。

从 1.6.9 版本开始,您可以使用监听器容器上的 setLookupKeyQualifier 向查找键添加限定符。 这样做可以,例如,侦听具有相同名称但位于不同虚拟主机中的队列(在这种情况下,您将为每个虚拟主机提供一个连接工厂)。

例如,使用查找键限定符 thing1 和侦听队列 thing2 的容器,您可以注册目标连接工厂的查找键可以是 thing1[thing2]

目标(如果提供,则为默认)连接工厂必须具有相同的发布者确认和返回设置。 请参阅 发布者确认和返回

从 2.4.4 版本开始,可以禁用此验证。 如果您在某些情况下确认和返回的值需要不相等,您可以使用 AbstractRoutingConnectionFactory#setConsistentConfirmsReturns 来关闭验证。 请注意,添加到 AbstractRoutingConnectionFactory 的第一个连接工厂将决定 confirmsreturns 的一般值。

如果您在某些情况下希望检查某些消息的确认/返回,而另一些则不检查,这可能会很有用。 例如:

@Bean
public RabbitTemplate rabbitTemplate() {
    final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
    cf.setHost("localhost");
    cf.setPort(5672);

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
    cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

    PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);

    final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
    connectionFactoryMap.put("true", cachingConnectionFactory);
    connectionFactoryMap.put("false", pooledChannelConnectionFactory);

    final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
    routingConnectionFactory.setConsistentConfirmsReturns(false);
    routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
    routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);

    final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);

    final Expression sendExpression = new SpelExpressionParser().parseExpression(
            "messageProperties.headers['x-use-publisher-confirms'] ?: false");
    rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}

这样,带有头 x-use-publisher-confirms: true 的消息将通过缓存连接发送,您可以确保消息传递。 有关确保消息传递的更多信息,请参阅 发布者确认和返回

队列亲和性与 LocalizedQueueConnectionFactory

在集群中使用 HA 队列时,为了获得最佳性能,您可能希望连接到主队列所在的物理代理。 CachingConnectionFactory 可以配置多个代理地址。 这是为了故障转移,客户端会根据配置的 AddressShuffleMode 顺序尝试连接。 LocalizedQueueConnectionFactory 使用管理插件提供的 REST API 来确定哪个节点是队列的主节点。 然后,它会创建(或从缓存中检索)一个 CachingConnectionFactory,该工厂仅连接到该节点。 如果连接失败,则会确定新的主节点,并且消费者连接到该节点。 LocalizedQueueConnectionFactory 配置了一个默认连接工厂,以防无法确定队列的物理位置,在这种情况下,它会正常连接到集群。

LocalizedQueueConnectionFactory 是一个 RoutingConnectionFactorySimpleMessageListenerContainer 使用队列名称作为查找键,如上文 路由连接工厂 中所述。

因此(使用队列名称进行查找),LocalizedQueueConnectionFactory 只能在容器配置为侦听单个队列时使用。

RabbitMQ 管理插件必须在每个节点上启用。

此连接工厂旨在用于长期连接,例如 SimpleMessageListenerContainer 使用的连接。 它不适用于短连接使用,例如与 RabbitTemplate 一起使用,因为在建立连接之前调用 REST API 会产生开销。 此外,对于发布操作,队列是未知的,并且消息无论如何都会发布到所有集群成员,因此查找节点的逻辑价值不大。

以下配置示例显示了如何配置工厂:

@Autowired
private ConfigurationProperties props;

@Bean
public CachingConnectionFactory defaultConnectionFactory() {
    CachingConnectionFactory cf = new CachingConnectionFactory();
    cf.setAddresses(this.props.getAddresses());
    cf.setUsername(this.props.getUsername());
    cf.setPassword(this.props.getPassword());
    cf.setVirtualHost(this.props.getVirtualHost());
    return cf;
}

@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
        @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
    return new LocalizedQueueConnectionFactory(defaultCF,
            StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
            StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
            StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
            this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
            false, null);
}

请注意,前三个参数是 addressesadminUrisnodes 的数组。 这些是位置参数,当容器尝试连接到队列时,它使用管理 API 来确定哪个节点是队列的主节点,并连接到与该节点在同一数组位置的地址。

从 3.0 版本开始,不再使用 RabbitMQ http-client 访问 Rest API。 相反,默认情况下,如果 spring-webflux 在类路径中,则使用 Spring Webflux 的 WebClient;否则使用 RestTemplate

WebFlux 添加到类路径:

Maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
</dependency>
Gradle
compile 'org.springframework.amqp:spring-rabbit'

您还可以通过实现 LocalizedQueueConnectionFactory.NodeLocator 并覆盖其 createClientrestCall 以及可选的 close 方法来使用其他 REST 技术。

lqcf.setNodeLocator(new NodeLocator<MyClient>() {

    @Override
    public MyClient createClient(String userName, String password) {
        ...
    }

    @Override
    public HashMap<String, Object> restCall(MyClient client, URI uri) {
        ...
    });

});

该框架提供了 WebFluxNodeLocatorRestTemplateNodeLocator,默认如上所述。

发布者确认和返回

通过将 CachingConnectionFactory 属性 publisherConfirmType 设置为 ConfirmType.CORRELATED,并将 publisherReturns 属性设置为 true,支持确认(带关联)和返回消息。

当设置这些选项时,工厂创建的 Channel 实例将包装在 PublisherCallbackChannel 中,该通道用于促进回调。 当获得此类通道时,客户端可以向 Channel 注册一个 PublisherCallbackChannel.ListenerPublisherCallbackChannel 实现包含将确认或返回路由到相应侦听器的逻辑。 这些功能将在以下部分进一步解释。

另请参阅 关联发布者确认和返回作用域操作 中的 simplePublisherConfirms

有关更多背景信息,请参阅 RabbitMQ 团队的博客文章 Introducing Publisher Confirms

连接和通道监听器

连接工厂支持注册 ConnectionListenerChannelListener 实现。 这允许您接收连接和通道相关事件的通知。 (RabbitAdmin 使用 ConnectionListener 在建立连接时执行声明 - 有关更多信息,请参阅 自动声明交换、队列和绑定)。 以下列表显示了 ConnectionListener 接口定义:

@FunctionalInterface
public interface ConnectionListener {

    void onCreate(Connection connection);

    default void onClose(Connection connection) {
    }

    default void onShutDown(ShutdownSignalException signal) {
    }

}

从 2.0 版本开始,org.springframework.amqp.rabbit.connection.Connection 对象可以提供 com.rabbitmq.client.BlockedListener 实例,以接收连接阻塞和解除阻塞事件的通知。 以下示例显示了 ChannelListener 接口定义:

@FunctionalInterface
public interface ChannelListener {

    void onCreate(Channel channel, boolean transactional);

    default void onShutDown(ShutdownSignalException signal) {
    }

}

有关您可能希望注册 ChannelListener 的一种场景,请参阅 发布是异步的——如何检测成功和失败

记录通道关闭事件

1.5 版本引入了一种机制,允许用户控制日志级别。

AbstractConnectionFactory 使用默认策略记录通道关闭事件,如下所示:

  • 正常通道关闭(200 OK)不会被记录。

  • 如果通道因被动队列声明失败而关闭,则以 DEBUG 级别记录。

  • 如果通道因独占消费者条件导致 basic.consume 被拒绝而关闭,则以 DEBUG 级别记录(从 3.1 开始,以前是 INFO)。

  • 所有其他事件都以 ERROR 级别记录。

要修改此行为,您可以将自定义 ConditionalExceptionLogger 注入 CachingConnectionFactorycloseExceptionLogger 属性中。

此外,AbstractConnectionFactory.DefaultChannelCloseLogger 现在是公共的,允许对其进行子类化。

另请参阅 消费者事件

运行时缓存属性

从 1.6 版本开始,CachingConnectionFactory 现在通过 getCacheProperties() 方法提供缓存统计信息。 这些统计信息可用于调整缓存以在生产环境中对其进行优化。 例如,高水位标记可用于确定是否应增加缓存大小。 如果它等于缓存大小,您可能需要考虑进一步增加。 下表描述了 CacheMode.CHANNEL 属性:

Table 1. Cache properties for CacheMode.CHANNEL
属性 含义
   connectionName

ConnectionNameStrategy 生成的连接名称。

   channelCacheSize

当前配置的允许空闲的最大通道数。

   localPort

连接的本地端口(如果可用)。 这可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。

   idleChannelsTx

当前空闲(缓存)的事务性通道数。

   idleChannelsNotTx

当前空闲(缓存)的非事务性通道数。

   idleChannelsTxHighWater

并发空闲(缓存)的事务性通道的最大数量。

   idleChannelsNotTxHighWater

并发空闲(缓存)的非事务性通道的最大数量。

下表描述了 CacheMode.CONNECTION 属性:

Table 2. Cache properties for CacheMode.CONNECTION
属性 含义
   connectionName:&amp;lt;localPort&amp;gt;

ConnectionNameStrategy 生成的连接名称。

   openConnections

表示与代理连接的连接对象数。

   channelCacheSize

当前配置的允许空闲的最大通道数。

   connectionCacheSize

当前配置的允许空闲的最大连接数。

   idleConnections

当前空闲的连接数。

   idleConnectionsHighWater

并发空闲的连接的最大数量。

   idleChannelsTx:&amp;lt;localPort&amp;gt;

此连接当前空闲(缓存)的事务性通道数。 您可以使用属性名称的 localPort 部分与 RabbitMQ 管理 UI 上的连接和通道相关联。

   idleChannelsNotTx:&amp;lt;localPort&amp;gt;

此连接当前空闲(缓存)的非事务性通道数。 属性名称的 localPort 部分可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。

   idleChannelsTxHighWater:&amp;lt;localPort&amp;gt;

并发空闲(缓存)的事务性通道的最大数量。 属性名称的 localPort 部分可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。

   idleChannelsNotTxHighWater:&amp;lt;localPort&amp;gt;

并发空闲(缓存)的非事务性通道的最大数量。 您可以使用属性名称的 localPort 部分与 RabbitMQ 管理 UI 上的连接和通道相关联。

cacheMode 属性(CHANNELCONNECTION)也包含在内。

cacheStats
Figure 1. JVisualVM 示例

RabbitMQ 自动连接/拓扑恢复

自 Spring AMQP 的第一个版本以来,该框架在代理发生故障时提供了自己的连接和通道恢复功能。 此外,如 配置代理 中所述,当重新建立连接时,RabbitAdmin 会重新声明任何基础设施 bean(队列等)。 因此,它不依赖于 amqp-client 库现在提供的 自动恢复功能。 amqp-client 默认启用自动恢复。 两种恢复机制之间存在一些不兼容性,因此,Spring 默认将底层 RabbitMQ connectionFactory 上的 automaticRecoveryEnabled 属性设置为 false。 即使该属性为 true,Spring 也会通过立即关闭任何已恢复的连接来有效地禁用它。

默认情况下,只有定义为 bean 的元素(队列、交换、绑定)才会在连接失败后重新声明。 有关如何更改该行为的信息,请参阅 恢复自动删除声明