AmqpTemplate
和 Spring 框架及相关项目提供的许多其他高级抽象一样,Spring AMQP 提供了一个“模板
”,它发挥着核心作用。定义主要操作的接口被称为 AmqpTemplate
。这些操作涵盖发送和接收消息的一般行为。换句话说,它们对于任何实现来说都不是唯一的,因此名称中包含“AMQP
”。另一方面,该接口的一些实现与 AMQP 协议的实现相关。与作为接口级 API 本身的 JMS 不同,AMQP 是一个线级协议。该协议的实现提供了自己的客户端库,因此模板接口的每个实现都依赖于某个特定的客户端库。目前,只有一种实现:RabbitTemplate
。在随后的示例中,我们经常使用 AmqpTemplate
。但是,当你查看配置示例或任何实例化模板或调用 setter 的代码摘录时,你都可以看到实现类型(例如,RabbitTemplate
)。
As with many other high-level abstractions provided by the Spring Framework and related projects, Spring AMQP provides a “template” that plays a central role.
The interface that defines the main operations is called AmqpTemplate
.
Those operations cover the general behavior for sending and receiving messages.
In other words, they are not unique to any implementation — hence the “AMQP” in the name.
On the other hand, there are implementations of that interface that are tied to implementations of the AMQP protocol.
Unlike JMS, which is an interface-level API itself, AMQP is a wire-level protocol.
The implementations of that protocol provide their own client libraries, so each implementation of the template interface depends on a particular client library.
Currently, there is only a single implementation: RabbitTemplate
.
In the examples that follow, we often use an AmqpTemplate
.
However, when you look at the configuration examples or any code excerpts where the template is instantiated or setters are invoked, you can see the implementation type (for example, RabbitTemplate
).
如前文所述,AmqpTemplate
接口定义了用于发送和接收消息的所有基本操作。我们会在 Sending Messages 和 Receiving Messages 中分别探讨消息发送和接收。
As mentioned earlier, the AmqpTemplate
interface defines all the basic operations for sending and receiving messages.
We will explore message sending and reception, respectively, in Sending Messages and Receiving Messages.
另请参阅 Async Rabbit Template。
See also Async Rabbit Template.
Adding Retry Capabilities
从 1.3 版开始,您现在可以通过配置 RabbitTemplate
来使用 RetryTemplate
,以便帮助处理经纪人连接问题。要获得完整信息,请参阅 spring-retry 项目。以下是仅使用指数级退避策略和默认 SimpleRetryPolicy
的一个示例,在向调用方引发异常之前尝试三次。
Starting with version 1.3, you can now configure the RabbitTemplate
to use a RetryTemplate
to help with handling problems with broker connectivity.
See the spring-retry project for complete information.
The following is only one example that uses an exponential back off policy and the default SimpleRetryPolicy
, which makes three tries before throwing the exception to the caller.
以下示例使用 XML 命名空间:
The following example uses the XML namespace:
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
以下示例在 Java 中使用 @Configuration
注解:
The following example uses the @Configuration
annotation in Java:
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
从 1.4 版开始,除了 retryTemplate
属性外,recoveryCallback
选项也在 RabbitTemplate
上受支持。它用作 RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)
的第二个参数。
Starting with version 1.4, in addition to the retryTemplate
property, the recoveryCallback
option is supported on the RabbitTemplate
.
It is used as a second argument for the RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)
.
|
The |
retryTemplate.execute(
new RetryCallback<Object, Exception>() {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}, new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}
在这种情况下,你不应向 RabbitTemplate
中注入 RetryTemplate
。
In this case, you would not inject a RetryTemplate
into the RabbitTemplate
.
Publishing is Asynchronous — How to Detect Successes and Failures
发布消息是一种异步机制,默认情况下,不能路由的消息会被 RabbitMQ 删除。对于成功的发布,您可以收到异步确认,如 Correlated Publisher Confirms and Returns 中所述。考虑两种失败情况:
Publishing messages is an asynchronous mechanism and, by default, messages that cannot be routed are dropped by RabbitMQ. For successful publishing, you can receive an asynchronous confirm, as described in Correlated Publisher Confirms and Returns. Consider two failure scenarios:
-
Publish to an exchange but there is no matching destination queue.
-
Publish to a non-existent exchange.
第一种情况由发布者退回内容涵盖,如 Correlated Publisher Confirms and Returns 中所述。
The first case is covered by publisher returns, as described in Correlated Publisher Confirms and Returns.
对于第二种情况,消息会被放弃,并且不会生成任何返回。底层通道会因异常而关闭。默认情况下,此异常会被记录,但你可以使用 CachingConnectionFactory
注册一个 ChannelListener
来获取此类事件的通知。以下示例展示了如何添加一个 ConnectionListener
:
For the second case, the message is dropped and no return is generated.
The underlying channel is closed with an exception.
By default, this exception is logged, but you can register a ChannelListener
with the CachingConnectionFactory
to obtain notifications of such events.
The following example shows how to add a ConnectionListener
:
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
你可以检查信号的 reason
属性以确定发生的问题。
You can examine the signal’s reason
property to determine the problem that occurred.
若要在发送线程上检测异常,你可以将 RabbitTemplate
上的 setChannelTransacted(true)
设置为 true
,而且异常将在 txCommit()
上检测到。但是,事务会极大地影响性能,因此在启用事务只用于此用例之前请对此予以仔细考虑。
To detect the exception on the sending thread, you can setChannelTransacted(true)
on the RabbitTemplate
and the exception is detected on the txCommit()
.
However, transactions significantly impede performance, so consider this carefully before enabling transactions for just this one use case.
Correlated Publisher Confirms and Returns
AmqpTemplate
的 RabbitTemplate
实现支持发布者确认和返回。
The RabbitTemplate
implementation of AmqpTemplate
supports publisher confirms and returns.
对于已返回的消息,模板的 mandatory
属性必须设置为 true
或 mandatory-expression
针对特定消息评估为 true
。此功能需要一个 CachingConnectionFactory
,其 publisherReturns
属性设置为 true
(请参阅 Publisher Confirms and Returns)。退回内容通过调用 setReturnsCallback(ReturnsCallback callback)
由其注册 RabbitTemplate.ReturnsCallback
发送至客户端。回调必须实现以下方法:
For returned messages, the template’s mandatory
property must be set to true
or the mandatory-expression
must evaluate to true
for a particular message.
This feature requires a CachingConnectionFactory
that has its publisherReturns
property set to true
(see Publisher Confirms and Returns).
Returns are sent to the client by it registering a RabbitTemplate.ReturnsCallback
by calling setReturnsCallback(ReturnsCallback callback)
.
The callback must implement the following method:
void returnedMessage(ReturnedMessage returned);
ReturnedMessage
具有以下属性:
The ReturnedMessage
has the following properties:
-
message
- the returned message itself -
replyCode
- a code indicating the reason for the return -
replyText
- a textual reason for the return - e.g.NO_ROUTE
-
exchange
- the exchange to which the message was sent -
routingKey
- the routing key that was used
每个 RabbitTemplate
只支持一个 ReturnsCallback
。另请参阅 Reply Timeout。
Only one ReturnsCallback
is supported by each RabbitTemplate
.
See also Reply Timeout.
对于发布者确认(也称为发布者确认),模板需要一个 CachingConnectionFactory
,且其 publisherConfirm
属性设置为 ConfirmType.CORRELATED
。确认是通过调用 setConfirmCallback(ConfirmCallback callback)
注册 RabbitTemplate.ConfirmCallback
发送给客户机的。回调必须实现此方法:
For publisher confirms (also known as publisher acknowledgements), the template requires a CachingConnectionFactory
that has its publisherConfirm
property set to ConfirmType.CORRELATED
.
Confirms are sent to the client by it registering a RabbitTemplate.ConfirmCallback
by calling setConfirmCallback(ConfirmCallback callback)
.
The callback must implement this method:
void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData
是在发送原始消息时由客户机提供的对象。ack
对于一个 ack
为真,并且对于一个 nack
为假。对于 nack
实例,如果在生成 nack
时原因可用,则原因可能包含 nack
的原因。一个示例是在发送消息到不存在的交换时。在该情况下,代理会关闭通道。关闭原因包含在 cause
中。cause
在 1.4 版中添加。
The CorrelationData
is an object supplied by the client when sending the original message.
The ack
is true for an ack
and false for a nack
.
For nack
instances, the cause may contain a reason for the nack
, if it is available when the nack
is generated.
An example is when sending a message to a non-existent exchange.
In that case, the broker closes the channel.
The reason for the closure is included in the cause
.
The cause
was added in version 1.4.
一个 RabbitTemplate
仅支持一个 ConfirmCallback
。
Only one ConfirmCallback
is supported by a RabbitTemplate
.
当 Rabbit 模板发送操作完成时,通道关闭。当连接工厂缓存已满时,导致接收确认或返回失败(当缓存中有空间时,通道未物理关闭并且返回和确认正常进行)。当缓存已满时,框架将关闭延迟最多五秒,以便留出接收确认和返回的时间。使用确认时,在收到最后一个确认后关闭通道。仅使用返回时,通道在全部五秒内保持打开状态。我们通常建议设置连接工厂的 |
When a rabbit template send operation completes, the channel is closed.
This precludes the reception of confirms or returns when the connection factory cache is full (when there is space in the cache, the channel is not physically closed and the returns and confirms proceed normally).
When the cache is full, the framework defers the close for up to five seconds, in order to allow time for the confirms and returns to be received.
When using confirms, the channel is closed when the last confirm is received.
When using only returns, the channel remains open for the full five seconds.
We generally recommend setting the connection factory’s |
在 2.1 版之前,启用了发布者确认的通道在收到确认之前返回到缓存。某些其他进程可以签出通道并执行导致通道关闭的操作,例如将消息发布到不存在的交换。这可能会导致确认丢失。2.1 版及更高版本在有待处理的确认时不再将通道返回到缓存。RabbitTemplate
在每个操作后对通道执行一个逻辑 close()
。一般来说,这意味着一次只在一个通道上有待处理的一个确认。
Before version 2.1, channels enabled for publisher confirms were returned to the cache before the confirms were received.
Some other process could check out the channel and perform some operation that causes the channel to close — such as publishing a message to a non-existent exchange.
This could cause the confirm to be lost.
Version 2.1 and later no longer return the channel to the cache while confirms are outstanding.
The RabbitTemplate
performs a logical close()
on the channel after each operation.
In general, this means that only one confirm is outstanding on a channel at a time.
从 2.2 版开始,在连接工厂的 |
Starting with version 2.2, the callbacks are invoked on one of the connection factory’s |
只要返回回调在 60 秒或更短时间内执行,就仍可确保在确认之前收到返回消息。已经计划确认在返回回调退出或 60 秒后(以先到者为准)送达。
The guarantee of receiving a returned message before the ack is still maintained as long as the return callback executes in 60 seconds or less. The confirm is scheduled to be delivered after the return callback exits or after 60 seconds, whichever comes first.
CorrelationData
对象有一个 CompletableFuture
,你可以使用它来获取结果,而不是在模板上使用 ConfirmCallback
。以下示例展示了如何配置一个 CorrelationData
实例:
The CorrelationData
object has a CompletableFuture
that you can use to get the result, instead of using a ConfirmCallback
on the template.
The following example shows how to configure a CorrelationData
instance:
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...
由于它是 CompletableFuture<Confirm>
,因此你可以在准备就绪时 get()
结果,或者使用 whenComplete()
进行异步回调。Confirm
对象是一个带有 2 个属性的简单 bean:ack
和 reason
(对于 nack
实例)。对于代理生成的 nack
实例,原因不会填充。它对于由框架生成的 nack
实例填充(例如,在 ack
实例未完成时关闭连接)。
Since it is a CompletableFuture<Confirm>
, you can either get()
the result when ready or use whenComplete()
for an asynchronous callback.
The Confirm
object is a simple bean with 2 properties: ack
and reason
(for nack
instances).
The reason is not populated for broker-generated nack
instances.
It is populated for nack
instances generated by the framework (for example, closing the connection while ack
instances are outstanding).
此外,当确认和返回都启用时,如果无法将 CorrelationData
return
属性路由到任何队列,则其 return
属性将填充为已返回的消息。确保在使用 ack
设置 future 之前设置返回消息属性。CorrelationData.getReturn()
返回一个 ReturnMessage
,其具有以下属性:
In addition, when both confirms and returns are enabled, the CorrelationData
return
property is populated with the returned message, if it couldn’t be routed to any queue.
It is guaranteed that the returned message property is set before the future is set with the ack
.
CorrelationData.getReturn()
returns a ReturnMessage
with properties:
-
message (the returned message)
-
replyCode
-
replyText
-
exchange
-
routingKey
另请参阅 Scoped Operations,以了解用于等待发布者确认的更简单的机制。
See also Scoped Operations for a simpler mechanism for waiting for publisher confirms.
Scoped Operations
通常情况下,在使用模板时,将从缓存中签出(或创建)“通道”,用于操作,然后将其返回给缓存以供重用。在多线程环境中,无法保证下一个操作使用相同的通道。然而,有时你想要更多地控制通道的使用,并确保在同一个通道上执行多个操作。
Normally, when using the template, a Channel
is checked out of the cache (or created), used for the operation, and returned to the cache for reuse.
In a multi-threaded environment, there is no guarantee that the next operation uses the same channel.
There may be times, however, where you want to have more control over the use of a channel and ensure that a number of operations are all performed on the same channel.
从 2.0 版开始,提供了一个名为 invoke
的新方法,并带有 OperationsCallback
。在回调范围内执行在提供的 RabbitOperations
参数上执行的任何操作均使用相同的专用 Channel
,该 Channel
将在最后关闭(不会返回到缓存)。如果频道为 PublisherCallbackChannel
,则在收到所有确认后将其返回到缓存(请参阅 Correlated Publisher Confirms and Returns)。
Starting with version 2.0, a new method called invoke
is provided, with an OperationsCallback
.
Any operations performed within the scope of the callback and on the provided RabbitOperations
argument use the same dedicated Channel
, which will be closed at the end (not returned to a cache).
If the channel is a PublisherCallbackChannel
, it is returned to the cache after all confirms have been received (see Correlated Publisher Confirms and Returns).
@FunctionalInterface
public interface OperationsCallback<T> {
T doInRabbit(RabbitOperations operations);
}
你可能需要这样做的一个示例是,如果你希望对底层 通道
使用 waitForConfirms()
方法。如前所述,此方法以前未在 Spring API 中暴露,因为通道通常会被缓存并共享。RabbitTemplate
现在提供 waitForConfirms(long timeout)
和 waitForConfirmsOrDie(long timeout)
,它们委托给 OperationsCallback
范围中使用的专用通道。显然,这些方法不能在该范围之外使用。
One example of why you might need this is if you wish to use the waitForConfirms()
method on the underlying Channel
.
This method was not previously exposed by the Spring API because the channel is, generally, cached and shared, as discussed earlier.
The RabbitTemplate
now provides waitForConfirms(long timeout)
and waitForConfirmsOrDie(long timeout)
, which delegate to the dedicated channel used within the scope of the OperationsCallback
.
The methods cannot be used outside of that scope, for obvious reasons.
请注意,在其他位置会提供一种更高级别的抽象,它允许您将确认与请求相关联(请参阅 Correlated Publisher Confirms and Returns)。如果您只想等到经纪人确认交付,可以使用以下示例中所示的技术:
Note that a higher-level abstraction that lets you correlate confirms to requests is provided elsewhere (see Correlated Publisher Confirms and Returns). If you want only to wait until the broker has confirmed delivery, you can use the technique shown in the following example:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
});
如果你希望在 OperationsCallback
范围内针对 RabbitAdmin
操作调用相同通道,则必须使用与 invoke
操作相同的 RabbitTemplate
构建管理员。
If you wish RabbitAdmin
operations to be invoked on the same channel within the scope of the OperationsCallback
, the admin must have been constructed by using the same RabbitTemplate
that was used for the invoke
operation.
如果已经模板操作在现有事务范围中执行,则上述讨论是无关紧要的,例如,当在已执行监听器容器线程上运行,并在已执行模板上执行操作时。在这种情况下,操作在该通道上执行,并在线程返回到容器时提交。在该场景中,无需使用 |
The preceding discussion is moot if the template operations are already performed within the scope of an existing transaction — for example, when running on a transacted listener container thread and performing operations on a transacted template.
In that case, the operations are performed on that channel and committed when the thread returns to the container.
It is not necessary to use |
以这种方式使用确认时,为将确认与请求关联而设置的大部分基础架构实际上并不是必需的(除非返回也已启用)。从版本 2.2 开始,连接工厂支持一个名为 publisherConfirmType
的新属性。当它被设置为 ConfirmType.SIMPLE
时,将避免使用该基础架构,并且确认处理的效率会更高。
When using confirms in this way, much of the infrastructure set up for correlating confirms to requests is not really needed (unless returns are also enabled).
Starting with version 2.2, the connection factory supports a new property called publisherConfirmType
.
When this is set to ConfirmType.SIMPLE
, the infrastructure is avoided and the confirm processing can be more efficient.
此外,RabbitTemplate
在已发送的消息 MessageProperties
中设置 publisherSequenceNumber
属性。如果你希望检查(或记录或以其他方式使用)特定确认,则可以使用重载的 invoke
方法执行此操作,如下例所示:
Furthermore, the RabbitTemplate
sets the publisherSequenceNumber
property in the sent message MessageProperties
.
If you wish to check (or log or otherwise use) specific confirms, you can do so with an overloaded invoke
method, as the following example shows:
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks);
这些 |
These |
以下示例记录 ack
和 nack
实例:
The following example logs ack
and nack
instances:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
}, (tag, multiple) -> {
log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
log.info("Nack: " + tag + ":" + multiple);
}));
作用域操作绑定到一个线程。请参阅 Strict Message Ordering in a Multi-Threaded Environment 了解多线程环境中严格顺序的讨论。
Scoped operations are bound to a thread. See Strict Message Ordering in a Multi-Threaded Environment for a discussion about strict ordering in a multi-threaded environment.
Strict Message Ordering in a Multi-Threaded Environment
Scoped Operations 中的讨论仅适用于在同一线程上执行操作的情况。
The discussion in Scoped Operations applies only when the operations are performed on the same thread.
考虑以下情况:
Consider the following situation:
-
thread-1
sends a message to a queue and hands off work tothread-2
-
thread-2
sends a message to the same queue
由于 RabbitMQ 具有异步性质且使用了缓存的频道,因此无法确定将使用相同的频道,从而无法保证消息到达队列的顺序。(在大多数情况下,它们会按顺序到达,但乱序交付的可能性不为零)。为解决此用例,您可以将具有大小 1
的有界频道缓存(以及 channelCheckoutTimeout
)结合使用,以确保始终在同一频道上发布消息,并且保证顺序。为此,如果您有其他用途的连接工厂(例如使用者),则应为模板使用专用的连接工厂,或将模板配置为使用嵌入在主连接工厂中的发布者连接工厂(请参阅 Using a Separate Connection)。
Because of the async nature of RabbitMQ and the use of cached channels; it is not certain that the same channel will be used and therefore the order in which the messages arrive in the queue is not guaranteed.
(In most cases they will arrive in order, but the probability of out-of-order delivery is not zero).
To solve this use case, you can use a bounded channel cache with size 1
(together with a channelCheckoutTimeout
) to ensure the messages are always published on the same channel, and order will be guaranteed.
To do this, if you have other uses for the connection factory, such as consumers, you should either use a dedicated connection factory for the template, or configure the template to use the publisher connection factory embedded in the main connection factory (see Using a Separate Connection).
通过一个简单的 Spring Boot 应用程序最能说明这一点:
This is best illustrated with a simple Spring Boot Application:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
publisherCF.setChannelCacheSize(1);
publisherCF.setChannelCheckoutTimeout(1000L);
return ccf;
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
Service(RabbitTemplate template, TaskExecutor exec) {
template.setUsePublisherConnection(true);
this.template = template;
this.exec = exec;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
}
void secondaryService(String toSend) {
LOG.info("Publishing from secondary service");
this.template.convertAndSend("queue", toSend);
}
}
即使发布是在两个不同的线程上执行的,它们都将使用相同的通道,因为缓存上限为一个通道。
Even though the publishing is performed on two different threads, they will both use the same channel because the cache is capped at a single channel.
从版本 2.3.7 开始,ThreadChannelConnectionFactory
支持使用 prepareContextSwitch
和 switchContext
方法将线程的通道传输到另一个线程。第一个方法返回一个上下文,传递给调用第二个方法的第二个线程。一个线程可以有一个非事务通道或一个事务通道(或每个通道一个);除非你使用两个连接工厂,否则你无法单独传输它们。示例如下:
Starting with version 2.3.7, the ThreadChannelConnectionFactory
supports transferring a thread’s channel(s) to another thread, using the prepareContextSwitch
and switchContext
methods.
The first method returns a context which is passed to the second thread which calls the second method.
A thread can have either a non-transactional channel or a transactional channel (or one of each) bound to it; you cannot transfer them individually, unless you use two connection factories.
An example follows:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
ThreadChannelConnectionFactory tccf() {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
private final ThreadChannelConnectionFactory connFactory;
Service(RabbitTemplate template, TaskExecutor exec,
ThreadChannelConnectionFactory tccf) {
this.template = template;
this.exec = exec;
this.connFactory = tccf;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
Object context = this.connFactory.prepareSwitchContext();
this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
}
void secondaryService(String toSend, Object threadContext) {
LOG.info("Publishing from secondary service");
this.connFactory.switchContext(threadContext);
this.template.convertAndSend("queue", toSend);
this.connFactory.closeThreadChannel();
}
}
一旦调用 prepareSwitchContext
,如果当前线程执行任何其他操作,则这些操作将在新通道上执行。当不再需要线程绑定的通道时,关闭该通道非常重要。
Once the prepareSwitchContext
is called, if the current thread performs any more operations, they will be performed on a new channel.
It is important to close the thread-bound channel when it is no longer needed.
Messaging Integration
从 1.4 版本开始,RabbitMessagingTemplate
(构建在 RabbitTemplate
上)提供与 Spring Framework 消息抽象(即 org.springframework.messaging.Message
)的集成。这使你可以通过使用 spring-messaging
Message<?>
抽象来发送和接收消息。其他 Spring 项目(例如 Spring Integration 和 Spring 的 STOMP 支持)会使用此抽象。涉及两个消息转换器:一个用于在 spring-messaging Message<?>
和 Spring AMQP 的 Message
抽象之间转换,另一个用于在 Spring AMQP 的 Message
抽象和底层 RabbitMQ 客户端库所需的格式之间转换。默认情况下,消息负载由提供的 RabbitTemplate
实例的消息转换器转换。或者,你可以注入一个带有其他负载转换器的自定义 MessagingMessageConverter
,如下例所示:
Starting with version 1.4, RabbitMessagingTemplate
(built on top of RabbitTemplate
) provides an integration with the Spring Framework messaging abstraction — that is,
org.springframework.messaging.Message
.
This lets you send and receive messages by using the spring-messaging
Message<?>
abstraction.
This abstraction is used by other Spring projects, such as Spring Integration and Spring’s STOMP support.
There are two message converters involved: one to convert between a spring-messaging Message<?>
and Spring AMQP’s Message
abstraction and one to convert between Spring AMQP’s Message
abstraction and the format required by the underlying RabbitMQ client library.
By default, the message payload is converted by the provided RabbitTemplate
instance’s message converter.
Alternatively, you can inject a custom MessagingMessageConverter
with some other payload converter, as the following example shows:
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
Validated User Id
从 1.6 版本开始,现在,模板支持 user-id-expression
(在使用 Java 配置时为 userIdExpression
)。如果消息已发送,则在评估此表达式后设置用户 ID 属性(如果尚未设置)。评估的根对象是要发送的消息。
Starting with version 1.6, the template now supports a user-id-expression
(userIdExpression
when using Java configuration).
If a message is sent, the user id property is set (if not already set) after evaluating this expression.
The root object for the evaluation is the message to be sent.
以下示例展示了如何使用 user-id-expression
属性:
The following examples show how to use the user-id-expression
attribute:
<rabbit:template ... user-id-expression="'guest'" />
<rabbit:template ... user-id-expression="@myConnectionFactory.username" />
第一个示例是一个文本表达式。第二个示例从应用程序上下文中获取连接工厂 bean 的 username
属性。
The first example is a literal expression.
The second obtains the username
property from a connection factory bean in the application context.
Using a Separate Connection
从版本 2.0.2 开始,你可以将 usePublisherConnection
属性设置为 true
,以便在可能的情况下使用与侦听器容器不同的连接。这样做是为了避免在生产者因任何原因阻塞时消费者被阻塞。为此,连接工厂维护了一个第二个内部连接工厂;默认情况下,它与主工厂的类型相同,但如果你希望为发布使用不同的工厂类型,则可以显式设置它。如果兔子模板在侦听器容器启动的事务中运行,那么无论此设置如何,都会使用容器的通道。
Starting with version 2.0.2, you can set the usePublisherConnection
property to true
to use a different connection to that used by listener containers, when possible.
This is to avoid consumers being blocked when a producer is blocked for any reason.
The connection factories maintain a second internal connection factory for this purpose; by default it is the same type as the main factory, but can be set explicitly if you wish to use a different factory type for publishing.
If the rabbit template is running in a transaction started by the listener container, the container’s channel is used, regardless of this setting.
一般来说,你不应该使用属性设置为 true
的模板 RabbitAdmin
。使用接收连接工厂的 RabbitAdmin
构造函数。如果你使用接收模板的其他构造函数,请确保模板属性为 false
。这是因为通常使用管理员声明侦听器容器的队列。使用属性设置为 true
的模板意味着独占队列(例如 AnonymousQueue
)将在与侦听器容器使用的连接不同的连接上声明。在该情况下,队列不能被容器使用。
In general, you should not use a RabbitAdmin
with a template that has this set to true
.
Use the RabbitAdmin
constructor that takes a connection factory.
If you use the other constructor that takes a template, ensure the template’s property is false
.
This is because, often, an admin is used to declare queues for listener containers.
Using a template that has the property set to true
would mean that exclusive queues (such as AnonymousQueue
) would be declared on a different connection to that used by listener containers.
In that case, the queues cannot be used by the containers.