接收消息
本文描述了如何在 Spring 中使用 JMS 接收消息。
同步接收
虽然 JMS 通常与异步处理相关联,但你也可以同步消费消息。重载的 receive(..)
方法提供了此功能。在同步接收期间,调用线程会阻塞,直到有消息可用。这可能是一个危险的操作,因为调用线程可能会无限期地阻塞。receiveTimeout
属性指定了接收方在放弃等待消息之前应该等待多长时间。
异步接收:消息驱动的 POJO
Spring 还通过使用 |
与 EJB 世界中的消息驱动 Bean (MDB) 类似,消息驱动 POJO (MDP) 充当 JMS 消息的接收器。对 MDP 的一个限制(但请参阅 使用 MessageListenerAdapter
)是它必须实现 jakarta.jms.MessageListener
接口。请注意,如果你的 POJO 在多个线程上接收消息,则确保你的实现是线程安全的非常重要。
以下示例展示了 MDP 的简单实现:
一旦你实现了 MessageListener
,就该创建消息监听器容器了。
以下示例展示了如何定义和配置 Spring 附带的消息监听器容器之一(在本例中为 DefaultMessageListenerContainer
):
有关每个实现支持的功能的完整描述,请参阅各种消息监听器容器的 Spring javadoc(所有这些容器都实现了 MessageListenerContainer)。
使用 SessionAwareMessageListener
接口
SessionAwareMessageListener
接口是一个 Spring 特定的接口,它提供了与 JMS MessageListener
接口类似的契约,但同时也允许消息处理方法访问接收到 Message
的 JMS Session
。以下列表显示了 SessionAwareMessageListener
接口的定义:
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
如果你希望 MDP 能够响应任何接收到的消息(通过使用 onMessage(Message, Session)
方法中提供的 Session
),你可以选择让你的 MDP 实现此接口(优先于标准的 JMS MessageListener
接口)。Spring 附带的所有消息监听器容器实现都支持实现 MessageListener
或 SessionAwareMessageListener
接口的 MDP。实现 SessionAwareMessageListener
的类附带的警告是它们通过接口与 Spring 绑定。是否使用它的选择完全取决于你作为应用程序开发人员或架构师。
请注意,SessionAwareMessageListener
接口的 onMessage(..)
方法抛出 JMSException
。与标准 JMS MessageListener
接口相反,当使用 SessionAwareMessageListener
接口时,处理任何抛出的异常是客户端代码的责任。
使用 MessageListenerAdapter
MessageListenerAdapter
类是 Spring 异步消息支持中的最后一个组件。简而言之,它允许你将几乎任何类公开为 MDP(尽管有一些限制)。
考虑以下接口定义:
请注意,尽管该接口既不扩展 MessageListener
也不扩展 SessionAwareMessageListener
接口,但你仍然可以通过使用 MessageListenerAdapter
类将其用作 MDP。还要注意各种消息处理方法如何根据它们可以接收和处理的各种 Message
类型的内容进行强类型化。
现在考虑 MessageDelegate
接口的以下实现:
特别要注意,MessageDelegate
接口的上述实现(DefaultMessageDelegate
类)根本没有任何 JMS 依赖。它确实是一个 POJO,我们可以通过以下配置将其转换为 MDP:
下一个示例展示了另一个只能接收 JMS TextMessage
消息的 MDP。请注意,消息处理方法实际上称为 receive
(MessageListenerAdapter
中的消息处理方法名称默认为 handleMessage
),但它是可配置的(如本节后面所示)。还要注意 receive(..)
方法如何强类型化为只接收和响应 JMS TextMessage
消息。以下列表显示了 TextMessageDelegate
接口的定义:
以下列表显示了一个实现 TextMessageDelegate
接口的类:
随之而来的 MessageListenerAdapter
的配置将如下所示:
请注意,如果 messageListener
接收到 TextMessage
以外的 JMS Message
类型,则会抛出 IllegalStateException
(并随后被吞噬)。MessageListenerAdapter
类的另一个功能是,如果处理程序方法返回非 void 值,则能够自动发送回响应 Message
。考虑以下接口和类:
如果你将 DefaultResponsiveTextMessageDelegate
与 MessageListenerAdapter
结合使用,则从执行 receive(..)
方法返回的任何非空值(在默认配置中)都会转换为 TextMessage
。然后,生成的 TextMessage
会发送到原始 Message
的 JMS Reply-To
属性中定义的 Destination
(如果存在),或者发送到 MessageListenerAdapter
上设置的默认 Destination
(如果已配置)。如果没有找到 Destination
,则会抛出 InvalidDestinationException
(请注意,此异常不会被吞噬并会沿着调用堆栈传播)。
在事务中处理消息
在事务中调用消息监听器只需要重新配置监听器容器。
你可以通过监听器容器定义上的 sessionTransacted
标志来激活本地资源事务。然后,每个消息监听器调用都在一个活动的 JMS 事务中操作,如果监听器执行失败,消息接收将回滚。发送响应消息(通过 SessionAwareMessageListener
)是同一本地事务的一部分,但任何其他资源操作(例如数据库访问)独立操作。这通常要求监听器实现中的重复消息检测,以涵盖数据库处理已提交但消息处理未能提交的情况。
考虑以下 bean 定义:
要参与外部管理的事务,你需要配置事务管理器并使用支持外部管理事务的监听器容器(通常是 DefaultMessageListenerContainer
)。
要为 XA 事务参与配置消息监听器容器,你需要配置 JtaTransactionManager
(它默认委托给 Jakarta EE 服务器的事务子系统)。请注意,底层 JMS ConnectionFactory
需要支持 XA 并正确注册到你的 JTA 事务协调器。(检查你的 Jakarta EE 服务器的 JNDI 资源配置。)这允许消息接收以及(例如)数据库访问成为同一事务的一部分(具有统一的提交语义,但以 XA 事务日志开销为代价)。
以下 bean 定义创建了一个事务管理器:
然后我们需要将其添加到我们之前的容器配置中。容器会处理其余部分。以下示例展示了如何执行此操作: