AMQP 抽象

Spring AMQP 包含两个模块(每个模块在发行版中都由一个 JAR 表示):spring-amqpspring-rabbitspring-amqp 模块包含 org.springframework.amqp.core 包。 在该包中,您可以找到代表核心 AMQP “模型” 的类。 我们的目的是提供不依赖于任何特定 AMQP 代理实现或客户端库的通用抽象。 最终用户代码可以仅针对抽象层进行开发,从而在不同供应商实现之间更具可移植性。 然后,这些抽象由特定于代理的模块(例如 'spring-rabbit')实现。 目前,只有 RabbitMQ 的实现。 然而,除了 RabbitMQ 之外,这些抽象也已在 .NET 中使用 Apache Qpid 进行了验证。 由于 AMQP 在协议级别操作,原则上,您可以将 RabbitMQ 客户端与任何支持相同协议版本的代理一起使用,但我们目前不测试任何其他代理。 本概述假设您已经熟悉 AMQP 规范的基础知识。 如果不是,请查看 其他资源 中列出的资源。

Message

0-9-1 AMQP 规范没有定义 Message 类或接口。 相反,在执行 basicPublish() 等操作时,内容作为字节数组参数传递,附加属性作为单独的参数传递。 Spring AMQP 定义了一个 Message 类,作为更通用的 AMQP 领域模型表示的一部分。 Message 类的目的是将正文和属性封装在单个实例中,以便 API 反过来可以更简单。 以下示例显示了 Message 类的定义:

public class Message {

    private final MessageProperties messageProperties;

    private final byte[] body;

    public Message(byte[] body, MessageProperties messageProperties) {
        this.body = body;
        this.messageProperties = messageProperties;
    }

    public byte[] getBody() {
        return this.body;
    }

    public MessageProperties getMessageProperties() {
        return this.messageProperties;
    }
}

MessageProperties 接口定义了几个常用属性,例如 'messageId'、'timestamp'、'contentType' 等。 您还可以通过调用 setHeader(String key, Object value) 方法,用用户定义的 'headers' 扩展这些属性。

1.5.71.6.111.7.42.0.0 版本开始,如果消息正文是序列化的 Serializable java 对象,则在执行 toString() 操作(例如在日志消息中)时,它不再(默认情况下)被反序列化。 这是为了防止不安全的 deserialization。 默认情况下,只有 java.utiljava.lang 类被反序列化。 要恢复到以前的行为,可以通过调用 Message.addAllowedListPatterns(…​) 添加允许的类/包模式。 支持简单的 通配符,例如 com.something., *.MyClass。 无法反序列化的正文在日志消息中表示为 byte[<size>]

Exchange

Exchange 接口表示 AMQP Exchange,这是消息生产者发送到的地方。 代理的虚拟主机中的每个 Exchange 都有一个唯一的名称以及其他一些属性。 以下示例显示了 Exchange 接口:

public interface Exchange {

    String getName();

    String getExchangeType();

    boolean isDurable();

    boolean isAutoDelete();

    Map<String, Object> getArguments();

}

如您所见,Exchange 还有一个由 ExchangeTypes 中定义的常量表示的“类型”。 基本类型是:directtopicfanoutheaders。 在核心包中,您可以找到这些类型中每种类型的 Exchange 接口实现。 这些 Exchange 类型的行为在如何处理与队列的绑定方面有所不同。 例如,Direct 交换机允许队列通过固定的路由键(通常是队列的名称)进行绑定。 Topic 交换机支持带有路由模式的绑定,路由模式可能包括用于“恰好一个”和“零个或多个”的 '*' 和 '#' 通配符。 Fanout 交换机将消息发布到所有绑定到它的队列,而不考虑任何路由键。 有关这些以及其他 Exchange 类型的更多信息,请参阅 AMQP Exchanges

从 3.2 版本开始,引入了 ConsistentHashExchange 类型,以便在应用程序配置阶段提供便利。 它提供了 x-consistent-hash 等选项作为交换机类型。 允许配置 hash-headerhash-property 交换机定义参数。 相应的 RabbitMQ rabbitmq_consistent_hash_exchange 插件必须在代理上启用。 有关一致哈希交换的目的、逻辑和行为的更多信息,请参阅官方 RabbitMQ {rabbitmq-server-github}/rabbitmq_consistent_hash_exchange[文档]。

AMQP 规范还要求任何代理提供一个没有名称的“默认” direct 交换机。 所有声明的队列都绑定到该默认 Exchange,并以其名称作为路由键。 您可以在 AmqpTemplate 中了解有关 Spring AMQP 中默认 Exchange 用法的更多信息。

Queue

Queue 类表示消息消费者从中接收消息的组件。 与各种 Exchange 类一样,我们的实现旨在成为此核心 AMQP 类型的抽象表示。 以下列表显示了 Queue 类:

public class Queue  {

    private final String name;

    private volatile boolean durable;

    private volatile boolean exclusive;

    private volatile boolean autoDelete;

    private volatile Map<String, Object> arguments;

    /**
     * The queue is durable, non-exclusive and non auto-delete.
     *
     * @param name the name of the queue.
     */
    public Queue(String name) {
        this(name, true, false, false);
    }

    // Getters and Setters omitted for brevity

}

请注意,构造函数接受队列名称。 根据实现的不同,管理模板可能会提供用于生成唯一命名队列的方法。 此类队列可用作“reply-to”地址或在其他 临时 情况下使用。 因此,自动生成队列的 'exclusive' 和 'autoDelete' 属性都将设置为 'true'。

有关使用命名空间支持声明队列(包括队列参数)的信息,请参阅 配置代理 中的队列部分。

Binding

鉴于生产者发送到交换机,消费者从队列接收消息,连接队列到交换机的绑定对于通过消息传递连接这些生产者和消费者至关重要。 在 Spring AMQP 中,我们定义了一个 Binding 类来表示这些连接。 本节回顾了将队列绑定到交换机的基本选项。

您可以使用固定的路由键将队列绑定到 DirectExchange,如以下示例所示:

new Binding(someQueue, someDirectExchange, "foo.bar");

您可以使用路由模式将队列绑定到 TopicExchange,如以下示例所示:

new Binding(someQueue, someTopicExchange, "foo.*");

您可以将队列绑定到 FanoutExchange 而不使用路由键,如以下示例所示:

new Binding(someQueue, someFanoutExchange);

我们还提供了一个 BindingBuilder 来促进“流畅 API”风格,如以下示例所示:

Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");

为了清晰起见,前面的示例显示了 BindingBuilder 类,但当使用 'bind()' 方法的静态导入时,这种风格效果很好。

Binding 类的实例本身只包含有关连接的数据。 换句话说,它不是一个“活动”组件。 然而,正如您稍后将在 配置代理 中看到的,AmqpAdmin 类可以使用 Binding 实例来实际触发代理上的绑定操作。 此外,正如您在同一部分中看到的,您可以使用 Spring 的 @Bean 注解在 @Configuration 类中定义 Binding 实例。 还有一个方便的基类,它进一步简化了为生成 AMQP 相关 bean 定义的方法,并识别队列、交换机和绑定,以便它们在应用程序启动时都在 AMQP 代理上声明。

AmqpTemplate 也定义在核心包中。 作为实际 AMQP 消息传递中涉及的主要组件之一,它在自己的部分中进行了详细讨论(请参阅 AmqpTemplate)。