发送消息
发送消息时,可以使用以下任一方法:
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
我们可以从前面列表中最后一个方法开始讨论,因为它实际上是最明确的。
它允许在运行时提供 AMQP 交换名称(以及路由键)。
最后一个参数是负责实际创建消息实例的回调。
使用此方法发送消息的示例如下所示:
以下示例展示了如何使用 send
方法发送消息:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
如果您打算使用该模板实例大部分或全部时间发送到同一个交换机,则可以在模板本身上设置 exchange
属性。
在这种情况下,您可以使用前面列表中的第二个方法。
以下示例在功能上等同于上一个示例:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果模板上同时设置了 exchange
和 routingKey
属性,则可以使用只接受 Message
的方法。
以下示例展示了如何实现:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
更好地理解交换机和路由键属性的方法是,显式方法参数总是覆盖模板的默认值。
事实上,即使您没有在模板上显式设置这些属性,也总会有默认值存在。
在这两种情况下,默认值都是空 String
,但这实际上是一个合理的默认值。
就路由键而言,它首先并非总是必需的(例如,对于 Fanout
交换机)。
此外,队列可以绑定到带有空 String
的交换机。
这些都是依赖模板的路由键属性的默认空 String
值的合法场景。
就交换机名称而言,空 String
常用,因为 AMQP 规范将“默认交换机
”定义为没有名称。
由于所有队列都自动绑定到该默认交换机(这是一个直连交换机),并使用它们的名称作为绑定值,因此前面列表中的第二个方法可以用于通过默认交换机向任何队列进行简单的点对点消息传递。
您可以将队列名称作为 routingKey
提供,方法是在运行时提供方法参数。
以下示例展示了如何实现:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
或者,您可以创建一个主要或专门用于发布到单个队列的模板。 以下示例展示了如何实现:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
消息构建器 API
从 1.3 版本开始,提供了 MessageBuilder
和 MessagePropertiesBuilder
提供的消息构建器 API。
这些方法提供了一种方便的“流畅
”方式来创建消息或消息属性。
以下示例展示了流畅 API 的实际应用:
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
{spring-amqp-java-docs}/core/MessageProperties.html[MessageProperties
] 上定义的每个属性都可以设置。
其他方法包括 setHeader(String key, String value)
、removeHeader(String key)
、removeHeaders()
和 copyProperties(MessageProperties properties)
。
每个属性设置方法都有一个 set*IfAbsent()
变体。
在存在默认初始值的情况下,该方法名为 set*IfAbsentOrDefault()
。
提供了五个静态方法来创建初始消息构建器:
public static MessageBuilder withBody(byte[] body) [id="CO1-1"]1
public static MessageBuilder withClonedBody(byte[] body) [id="CO1-2"]2
public static MessageBuilder withBody(byte[] body, int from, int to) [id="CO1-3"]3
public static MessageBuilder fromMessage(Message message) [id="CO1-4"]4
public static MessageBuilder fromClonedMessage(Message message) [id="CO1-5"]5
<1> 由构建器创建的消息的 body 是对参数的直接引用。 <1> 由构建器创建的消息的 body 是一个新数组,其中包含参数中字节的副本。 <1> 由构建器创建的消息的 body 是一个新数组,其中包含参数中字节范围的副本。 有关更多详细信息,请参阅 link:https://docs.oracle.com/javase/7/docs/api/java/util/Arrays.html[`Arrays.copyOfRange()`]。 <1> 由构建器创建的消息的 body 是对参数 body 的直接引用。 参数的属性被复制到一个新的 `MessageProperties` 对象中。 <1> 由构建器创建的消息的 body 是一个新数组,其中包含参数 body 的副本。 参数的属性被复制到一个新的 `MessageProperties` 对象中。
提供了三个静态方法来创建 MessagePropertiesBuilder
实例:
public static MessagePropertiesBuilder newInstance() [id="CO2-1"]1
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) [id="CO2-2"]2
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) [id="CO2-3"]3
<1> 一个新的消息属性对象使用默认值进行初始化。 <1> 构建器使用提供的属性对象进行初始化,并且 `build()` 将返回该对象。 <1> 参数的属性被复制到一个新的 `MessageProperties` 对象中。
对于 AmqpTemplate
的 RabbitTemplate
实现,每个 send()
方法都有一个重载版本,它接受一个额外的 CorrelationData
对象。
当发布者确认启用时,此对象将在 AmqpTemplate
中描述的回调中返回。
这允许发送方将确认(ack
或 nack
)与发送的消息关联起来。
从 1.6.7 版本开始,引入了 CorrelationAwareMessagePostProcessor
接口,允许在消息转换后修改关联数据。
以下示例展示了如何使用它:
Message postProcessMessage(Message message, Correlation correlation);
在 2.0 版本中,此接口已弃用。
该方法已移至 MessagePostProcessor
,并带有一个委托给 postProcessMessage(Message message)
的默认实现。
同样从 1.6.7 版本开始,提供了一个名为 CorrelationDataPostProcessor
的新回调接口。
它在所有 MessagePostProcessor
实例(在 send()
方法中提供以及在 setBeforePublishPostProcessors()
中提供)之后调用。
实现可以更新或替换 send()
方法中提供的关联数据(如果有)。
Message
和原始 CorrelationData
(如果有)作为参数提供。
以下示例展示了如何使用 postProcess
方法:
CorrelationData postProcess(Message message, CorrelationData correlationData);
发布者返回
当模板的 mandatory
属性为 true
时,返回的消息由 AmqpTemplate
中描述的回调提供。
从 1.4 版本开始,RabbitTemplate
支持 SpEL mandatoryExpression
属性,该属性针对每个请求消息作为根评估对象进行评估,解析为 boolean
值。
表达式中可以使用 Bean 引用,例如 @myBean.isMandatory(#root)
。
发布者返回也可以由 RabbitTemplate
在发送和接收操作中内部使用。
有关更多信息,请参阅 回复超时。
批处理
1.4.2 版本引入了 BatchingRabbitTemplate
。
它是 RabbitTemplate
的子类,带有一个重写的 send
方法,该方法根据 BatchingStrategy
批量处理消息。
只有当批次完成时,消息才会被发送到 RabbitMQ。
以下列表显示了 BatchingStrategy
接口定义:
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
批处理数据保存在内存中。 如果系统发生故障,未发送的消息可能会丢失。
提供了 SimpleBatchingStrategy
。
它支持向单个交换机或路由键发送消息。
它具有以下属性:
-
batchSize
:批次中在发送前包含的消息数量。 -
bufferLimit
:批处理消息的最大大小。 如果超出此限制,它会抢占batchSize
,并导致发送部分批次。 -
timeout
:在没有新的活动将消息添加到批次时,部分批次发送后的时间。
SimpleBatchingStrategy
通过在每个嵌入消息前面加上一个四字节的二进制长度来格式化批次。
通过将 springBatchFormat
消息属性设置为 lengthHeader4
,将其传达给接收系统。
批处理消息默认由监听器容器自动解批(通过使用 springBatchFormat
消息头)。
拒绝批次中的任何消息都会导致整个批次被拒绝。
但是,有关更多信息,请参阅 @RabbitListener with Batching。