配置 Broker
AMQP 规范描述了如何使用该协议在 Broker 上配置队列、交换器和绑定。这些操作(从 0.8 规范及更高版本可移植)存在于 org.springframework.amqp.core
包中的 AmqpAdmin
接口中。该类的 RabbitMQ 实现是 RabbitAdmin
,位于 org.springframework.amqp.rabbit.core
包中。AmqpAdmin
接口基于使用 Spring AMQP 领域抽象,其代码清单如下:
public interface AmqpAdmin {
// Exchange Operations
void declareExchange(Exchange exchange);
void deleteExchange(String exchangeName);
// Queue Operations
Queue declareQueue();
String declareQueue(Queue queue);
void deleteQueue(String queueName);
void deleteQueue(String queueName, boolean unused, boolean empty);
void purgeQueue(String queueName, boolean noWait);
// Binding Operations
void declareBinding(Binding binding);
void removeBinding(Binding binding);
Properties getQueueProperties(String queueName);
}
另请参见 作用域操作。getQueueProperties()
方法返回关于队列的一些有限信息(消息计数和消费者计数)。返回的属性的键在 RabbitAdmin
中作为常量提供(QUEUE_NAME
、QUEUE_MESSAGE_COUNT
和 QUEUE_CONSUMER_COUNT
)。RabbitMQ REST API 在 QueueInfo
对象中提供了更多信息。无参数的 declareQueue()
方法在 Broker 上定义一个名称自动生成的队列。此自动生成队列的附加属性是 exclusive=true
、autoDelete=true
和 durable=false
。declareQueue(Queue queue)
方法接受一个 Queue
对象并返回已声明队列的名称。如果提供的 Queue
的 name
属性为空 String
,则 Broker 会声明一个生成名称的队列。该名称将返回给调用者。该名称也会添加到 Queue
的 actualName
属性中。您只能通过直接调用 RabbitAdmin
来以编程方式使用此功能。当在应用程序上下文中以声明方式定义队列时使用 Admin 进行自动声明时,您可以将 name
属性设置为空字符串 ""
。然后 Broker 会创建名称。从版本 2.1 开始,监听器容器可以使用这种类型的队列。有关详细信息,请参见 容器和 Broker 命名队列。这与 AnonymousQueue
不同,AnonymousQueue
中框架生成唯一的 (UUID
) 名称并将 durable
设置为 false
,exclusive
和 autoDelete
设置为 true
。一个 name
属性为空(或缺失)的 <rabbit:queue/>
总是创建一个 AnonymousQueue
。请参见 AnonymousQueue
以了解为什么 AnonymousQueue
优于 Broker 生成的队列名称以及如何控制名称的格式。从版本 2.1 开始,匿名队列默认使用参数 Queue.X_QUEUE_LEADER_LOCATOR
设置为 client-local
进行声明。这确保了队列在应用程序连接到的节点上声明。声明式队列必须具有固定名称,因为它们可能在上下文中的其他地方被引用——例如,在以下示例中所示的监听器中:
<rabbit:listener-container>
<rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>
请参阅 交换器、队列和绑定的自动声明。此接口的 RabbitMQ 实现是 RabbitAdmin
,当使用 Spring XML 配置时,它类似于以下示例:
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>
当 CachingConnectionFactory
缓存模式为 CHANNEL
(默认)时,RabbitAdmin
实现会自动延迟声明在同一 ApplicationContext
中声明的队列、交换器和绑定。这些组件会在与 Broker 建立 Connection
后立即声明。有一些命名空间特性使这非常方便——例如,在 Stocks 示例应用程序中,我们有以下内容:
<rabbit:queue id="tradeQueue"/>
<rabbit:queue id="marketDataQueue"/>
<fanout-exchange name="broadcast.responses"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="tradeQueue"/>
</bindings>
</fanout-exchange>
<topic-exchange name="app.stock.marketdata"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
</bindings>
</topic-exchange>
在前面的示例中,我们使用匿名队列(实际上,在内部,只是框架生成的名称的队列,而不是 Broker 生成的)并通过 ID 引用它们。我们还可以声明具有显式名称的队列,这些名称也作为它们在上下文中的 bean 定义的标识符。以下示例配置了一个具有显式名称的队列:
<rabbit:queue name="stocks.trade.queue"/>
您可以同时提供 |
队列可以配置附加参数——例如,x-message-ttl
。当您使用命名空间支持时,它们以参数名/参数值对的 Map
形式提供,通过 <rabbit:queue-arguments>
元素定义。以下示例展示了如何实现:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
默认情况下,参数被假定为字符串。对于其他类型的参数,您必须提供类型。以下示例显示了如何指定类型:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments value-type="java.lang.Long">
<entry key="x-message-ttl" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>
当提供混合类型的参数时,您必须为每个 entry 元素提供类型。以下示例展示了如何实现:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">100</value>
</entry>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
从 Spring Framework 3.2 及更高版本开始,这可以更简洁地声明,如下所示:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
当使用 Java 配置时,Queue.X_QUEUE_LEADER_LOCATOR
参数通过 Queue
类上的 setLeaderLocator()
方法作为一级属性支持。从版本 2.1 开始,匿名队列默认将此属性设置为 client-local
进行声明。这确保了队列在应用程序连接到的节点上声明。
RabbitMQ Broker 不允许声明具有不匹配参数的队列。例如,如果一个 queue
已经存在且没有 time to live
参数,并且您尝试用(例如)key="x-message-ttl" value="100"
声明它,则会抛出异常。
默认情况下,当发生任何异常时,RabbitAdmin
会立即停止处理所有声明。这可能会导致下游问题,例如监听器容器因另一个队列(在错误队列之后定义)未声明而无法初始化。此行为可以通过将 RabbitAdmin
实例上的 ignore-declaration-exceptions
属性设置为 true
来修改。此选项指示 RabbitAdmin
记录异常并继续声明其他元素。当使用 Java 配置 RabbitAdmin
时,此属性名为 ignoreDeclarationExceptions
。这是一个适用于所有元素的全局设置。队列、交换器和绑定具有一个类似的属性,该属性仅适用于这些元素。在 1.6 版本之前,此属性仅在通道上发生 IOException
时才生效,例如当当前属性和所需属性之间存在不匹配时。现在,此属性对任何异常都生效,包括 TimeoutException
和其他异常。此外,任何声明异常都会导致发布 DeclarationExceptionEvent
,这是一个可以由上下文中的任何 ApplicationListener
消费的 ApplicationEvent
。该事件包含对 admin、正在声明的元素和 Throwable
的引用。
Headers 交换器
从版本 1.3 开始,您可以配置 HeadersExchange
以匹配多个 Header。您还可以指定是否必须匹配任何或所有 Header。以下示例展示了如何实现:
<rabbit:headers-exchange name="headers-test">
<rabbit:bindings>
<rabbit:binding queue="bucket">
<rabbit:binding-arguments>
<entry key="foo" value="bar"/>
<entry key="baz" value="qux"/>
<entry key="x-match" value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>
从版本 1.6 开始,您可以配置 Exchanges
具有 internal
标志(默认为 false
),并且这样的 Exchange
通过 RabbitAdmin
(如果应用程序上下文中存在)在 Broker 上正确配置。如果交换器的 internal
标志为 true
,RabbitMQ 将不允许客户端使用该交换器。这对于死信交换器或交换器到交换器绑定很有用,您不希望发布者直接使用该交换器。
要了解如何使用 Java 配置 AMQP 基础设施,请查看 Stock 示例应用程序,其中有一个 @Configuration
类 AbstractStockRabbitConfiguration
,它又包含 RabbitClientConfiguration
和 RabbitServerConfiguration
子类。以下清单显示了 AbstractStockRabbitConfiguration
的代码:
@Configuration
public abstract class AbstractStockAppRabbitConfiguration {
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public TopicExchange marketDataExchange() {
return new TopicExchange("app.stock.marketdata");
}
// additional code omitted for brevity
}
在 Stock 应用程序中,服务器使用以下 @Configuration
类进行配置:
@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration {
@Bean
public Queue stockRequestQueue() {
return new Queue("app.stock.request");
}
}
这是 @Configuration
类整个继承链的末尾。最终结果是 TopicExchange
和 Queue
在应用程序启动时声明到 Broker。在服务器配置中没有将 TopicExchange
绑定到队列,因为这在客户端应用程序中完成。然而,股票请求队列会自动绑定到 AMQP 默认交换器。此行为由规范定义。
客户端 @Configuration
类更有趣一些。它的声明如下:
@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
/**
* Binds to the market data exchange.
* Interested in any stock quotes
* that match its routing key.
*/
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
// additional code omitted for brevity
}
客户端通过 AmqpAdmin
上的 declareQueue()
方法声明另一个队列。它将该队列绑定到市场数据交换器,并使用属性文件中外部化的路由模式。
队列和交换器的 Builder API
版本 1.6 引入了一个方便的流式 API,用于在使用 Java 配置时配置 Queue
和 Exchange
对象。以下示例展示了如何使用它:
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo", "bar")
.build();
}
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo", "bar")
.build();
}
有关详细信息,请参阅 {spring-amqp-java-docs}/core/QueueBuilder.html[org.springframework.amqp.core.QueueBuilder
] 和 {spring-amqp-java-docs}/core/ExchangeBuilder.html[org.springframework.amqp.core.ExchangeBuilder
] 的 Javadoc。
从版本 2.0 开始,ExchangeBuilder
现在默认创建持久交换器,以与各个 AbstractExchange
类上的简单构造函数保持一致。要使用构建器创建非持久交换器,请在调用 .build()
之前使用 .durable(false)
。不再提供无参数的 durable()
方法。
版本 2.2 引入了流式 API 来添加“众所周知”的交换器和队列参数……
@Bean
public Queue allArgs1() {
return QueueBuilder.nonDurable("all.args.1")
.ttl(1000)
.expires(200_000)
.maxLength(42)
.maxLengthBytes(10_000)
.overflow(Overflow.rejectPublish)
.deadLetterExchange("dlx")
.deadLetterRoutingKey("dlrk")
.maxPriority(4)
.lazy()
.leaderLocator(LeaderLocator.minLeaders)
.singleActiveConsumer()
.build();
}
@Bean
public DirectExchange ex() {
return ExchangeBuilder.directExchange("ex.with.alternate")
.durable(true)
.alternate("alternate")
.build();
}
声明交换器、队列和绑定的集合
您可以将 Declarable
对象(Queue
、Exchange
和 Binding
)的集合封装在 Declarables
对象中。RabbitAdmin
会检测应用程序上下文中的此类 bean(以及离散的 Declarable
bean),并在建立连接时(初始连接和连接失败后)在 Broker 上声明包含的对象。以下示例展示了如何实现:
@Configuration
public static class Config {
@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}
@Bean
public DirectExchange e1() {
return new DirectExchange("e1", false, true);
}
@Bean
public Queue q1() {
return new Queue("q1", false, false, true);
}
@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
@Bean
public Declarables es() {
return new Declarables(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true));
}
@Bean
public Declarables qs() {
return new Declarables(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true));
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Declarables prototypes() {
return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
}
@Bean
public Declarables bs() {
return new Declarables(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
}
@Bean
public Declarables ds() {
return new Declarables(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
}
}
在 2.1 版本之前,您可以通过定义 Collection<Declarable>
类型的 bean 来声明多个 Declarable
实例。这在某些情况下可能会导致不良的副作用,因为 admin 必须遍历所有 Collection<?>
bean。
版本 2.2 添加了 getDeclarablesByType
方法到 Declarables
;这可以作为一种便利,例如在声明监听器容器 bean 时使用。
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Declarables mixedDeclarables, MessageListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
container.setMessageListener(listener);
return container;
}
条件声明
默认情况下,所有队列、交换器和绑定都由应用程序上下文中的所有 RabbitAdmin
实例(假设它们具有 auto-startup="true"
)声明。
从版本 2.1.9 开始,RabbitAdmin
有一个新属性 explicitDeclarationsOnly
(默认为 false
);当设置为 true
时,admin 将只声明明确配置为由该 admin 声明的 bean。
从 1.2 版本开始,您可以有条件地声明这些元素。当应用程序连接到多个 Broker 并需要指定特定元素应由哪个 Broker 声明时,这尤其有用。 |
代表这些元素的类实现了 Declarable
,它有两个方法:shouldDeclare()
和 getDeclaringAdmins()
。RabbitAdmin
使用这些方法来确定特定实例是否应该实际处理其 Connection
上的声明。
这些属性在命名空间中作为属性提供,如下例所示:
<rabbit:admin id="admin1" connection-factory="CF1" />
<rabbit:admin id="admin2" connection-factory="CF2" />
<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />
<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />
<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />
<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />
<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />
<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
<rabbit:bindings>
<rabbit:binding key="foo" queue="bar"/>
</rabbit:bindings>
</rabbit:direct-exchange>
默认情况下, |
类似地,您可以使用基于 Java 的 @Configuration
实现相同的效果。在以下示例中,组件由 admin1
声明,但不由 admin2
声明:
@Bean
public RabbitAdmin admin1() {
return new RabbitAdmin(cf1());
}
@Bean
public RabbitAdmin admin2() {
return new RabbitAdmin(cf2());
}
@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin1());
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin1());
return exchange;
}
@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin1());
return binding;
}
关于 id
和 name
属性的说明
<rabbit:queue/>
和 <rabbit:exchange/>
元素上的 name
属性反映了 Broker 中实体的名称。对于队列,如果省略 name
,则会创建一个匿名队列(请参见 AnonymousQueue
)。
在 2.0 版本之前,name
也被注册为 bean 名称别名(类似于 <bean/>
元素上的 name
)。
这导致了两个问题:
-
它阻止了声明具有相同名称的队列和交换器。
-
如果别名包含 SpEL 表达式 (
#{…}
),则无法解析。
从 2.0 版本开始,如果您使用 id
和 name
属性声明这些元素之一,则 name
不再声明为 bean 名称别名。如果您希望声明具有相同 name
的队列和交换器,则必须提供 id
。
如果元素只有 name
属性,则没有变化。bean 仍然可以通过 name
引用——例如,在绑定声明中。但是,如果名称包含 SpEL,您仍然无法引用它——您必须提供 id
用于引用。
AnonymousQueue
通常,当您需要一个唯一命名的、独占的、自动删除的队列时,我们建议您使用 AnonymousQueue
,而不是 Broker 定义的队列名称(使用 ""
作为 Queue
名称会导致 Broker 生成队列名称)。
这是因为:
-
队列实际上是在与 Broker 建立连接时声明的。这远在 bean 创建和连接之后。使用队列的 bean 需要知道其名称。事实上,应用程序启动时 Broker 甚至可能没有运行。
-
如果与 Broker 的连接因某种原因丢失,admin 会使用相同的名称重新声明
AnonymousQueue
。如果我们使用 Broker 声明的队列,队列名称会更改。
您可以控制 AnonymousQueue
实例使用的队列名称的格式。
默认情况下,队列名称以 spring.gen-
为前缀,后跟 UUID
的 base64 表示——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g
。
您可以在构造函数参数中提供 AnonymousQueue.NamingStrategy
实现。以下示例展示了如何实现:
@Bean
public Queue anon1() {
return new AnonymousQueue();
}
@Bean
public Queue anon2() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}
@Bean
public Queue anon3() {
return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}
第一个 bean 生成的队列名称以 spring.gen-
为前缀,后跟 UUID
的 base64 表示——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g
。第二个 bean 生成的队列名称以 something-
为前缀,后跟 UUID
的 base64 表示。第三个 bean 仅使用 UUID 生成名称(不进行 base64 转换)——例如,f20c818a-006b-4416-bf91-643590fedb0e
。
base64 编码使用 RFC 4648 中的“URL 和文件名安全字母表”。删除了尾随的填充字符 (=
)。
您可以提供自己的命名策略,从而在队列名称中包含其他信息(例如应用程序名称或客户端主机)。
当您使用 XML 配置时,可以指定命名策略。naming-strategy
属性存在于 <rabbit:queue>
元素上,用于实现 AnonymousQueue.NamingStrategy
的 bean 引用。以下示例展示了如何以各种方式指定命名策略:
<rabbit:queue id="uuidAnon" />
<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />
<rabbit:queue id="customAnon" naming-strategy="customNamer" />
<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />
<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-" />
</bean>
第一个示例创建诸如 spring.gen-MRBv9sqISkuCiPfOYfpo4g
之类的名称。第二个示例创建带有 UUID 字符串表示的名称。第三个示例创建诸如 custom.gen-MRBv9sqISkuCiPfOYfpo4g
之类的名称。
您还可以提供自己的命名策略 bean。
从版本 2.1 开始,匿名队列默认使用参数 Queue.X_QUEUE_LEADER_LOCATOR
设置为 client-local
进行声明。这确保了队列在应用程序连接到的节点上声明。您可以通过在构造实例后调用 queue.setLeaderLocator(null)
来恢复到以前的行为。
恢复自动删除声明
通常,RabbitAdmin
只恢复在应用程序上下文中声明为 bean 的队列/交换器/绑定;如果任何此类声明是自动删除的,则如果连接丢失,它们将由 Broker 删除。当连接重新建立时,admin 将重新声明这些实体。通常,通过调用 admin.declareQueue(…)
、admin.declareExchange(…)
和 admin.declareBinding(…)
创建的实体将不会被恢复。
从版本 2.4 开始,admin 有一个新属性 redeclareManualDeclarations
;当设置为 true
时,admin 将恢复这些实体以及应用程序上下文中的 bean。
如果调用 deleteQueue(…)
、deleteExchange(…)
或 removeBinding(…)
,则不会执行单个声明的恢复。当队列和交换器被删除时,相关的绑定会从可恢复实体中移除。
最后,调用 resetAllManualDeclarations()
将阻止恢复任何先前声明的实体。