使用 Spring JMS

本节介绍如何使用 Spring 的 JMS 组件。

JmsTemplateJmsClient

JmsTemplate 类是 JMS 核心包中的核心类。它简化了 JMS 的使用,因为它在发送或同步接收消息时处理资源的创建和释放。

JmsClient 是 Spring Framework 7.0 中的一个新 API 变体,遵循 JdbcClient 及类似的设计。JmsClient 基于 JmsTemplate 构建,用于直接的发送和接收操作,并提供每个操作的自定义选项。

使用 JmsTemplate

使用 JmsTemplate 的代码只需实现回调接口,这些接口为其提供了清晰定义的高级契约。MessageCreator 回调接口在给定 JmsTemplate 中由调用代码提供的 Session 时创建消息。为了允许更复杂的 JMS API 使用,SessionCallback 提供了 JMS 会话,ProducerCallback 暴露了 SessionMessageProducer 对。

JMS API 暴露了两种类型的发送方法,一种接受传输模式、优先级和生存时间作为服务质量 (QOS) 参数,另一种不接受 QOS 参数并使用默认值。由于 JmsTemplate 有许多发送方法,因此将 QOS 参数设置为 bean 属性,以避免发送方法数量的重复。类似地,同步接收调用的超时值通过 setReceiveTimeout 属性设置。

一些 JMS 提供商允许通过 ConnectionFactory 的配置来管理性地设置默认 QOS 值。这导致对 MessageProducer 实例的 send 方法 (send(Destination destination, Message message)) 的调用使用的 QOS 默认值与 JMS 规范中指定的不同。为了提供一致的 QOS 值管理,JmsTemplate 必须通过将布尔属性 isExplicitQosEnabled 设置为 true 来专门启用使用其自己的 QOS 值。

为了方便起见,JmsTemplate 还暴露了一个基本的请求-回复操作,允许发送消息并在作为操作一部分创建的临时队列上等待回复。

JmsTemplate 类的实例在配置后是线程安全的。这很重要,因为它意味着您可以配置一个 JmsTemplate 的单个实例,然后安全地将此共享引用注入到多个协作者中。需要明确的是,JmsTemplate 是有状态的,因为它维护对 ConnectionFactory 的引用,但此状态不是会话状态。

使用 JmsClient

从 Spring Framework 4.1 开始,JmsMessagingTemplate 构建在 JmsTemplate 之上,并提供了与 Spring 通用消息抽象的集成——也就是说,处理 org.springframework.messaging.Message 进行发送和接收,抛出 org.springframework.messaging.MessagingException,并且有效负载转换通过 org.springframework.messaging.converter.MessageConverter 进行(有许多常见的转换器实现可用)。

从 Spring Framework 7.0 开始,提供了一个名为 JmsClient 的流畅 API。这提供了围绕 org.springframework.messaging.Message 的可定制操作并抛出 org.springframework.messaging.MessagingException,类似于 JmsMessagingTemplate,以及与 org.springframework.messaging.converter.MessageConverter 的集成。JmsClient 可以为给定的 ConnectionFactory 或给定的 JmsTemplate 创建,在后一种情况下默认重用其设置。有关使用示例,请参阅 JmsClient

连接

JmsTemplate 需要引用 ConnectionFactoryConnectionFactory 是 JMS 规范的一部分,并作为使用 JMS 的入口点。客户端应用程序将其用作工厂,以创建与 JMS 提供商的连接,并封装各种配置参数,其中许多是供应商特定的,例如 SSL 配置选项。

在 EJB 中使用 JMS 时,供应商提供 JMS 接口的实现,以便它们可以参与声明性事务管理并执行连接和会话的池化。为了在 EJB 中将此实现与 JmsTemplate 一起使用,客户端应用程序应确保它引用 ConnectionFactory 的托管实现。Jakarta EE 容器通常要求您在 EJB 或 servlet 部署描述符中将 JMS 连接工厂声明为 resource-ref

缓存消息资源

标准 API 涉及创建许多中间对象。要发送消息,执行以下“API”步骤:

ConnectionFactory->Connection->Session->MessageProducer->send

ConnectionFactorySend 操作之间,创建并销毁了三个中间对象。为了优化资源使用并提高性能,Spring 提供了两种 ConnectionFactory 实现。

使用 SingleConnectionFactory

Spring 提供了 ConnectionFactory 接口的实现,SingleConnectionFactory,它在所有 createConnection() 调用上返回相同的 Connection 并忽略对 close() 的调用。这对于测试和独立环境很有用,以便同一连接可以用于可能跨任意数量事务的多个 JmsTemplate 调用。SingleConnectionFactory 引用了一个通常来自 JNDI 的标准 ConnectionFactory

使用 CachingConnectionFactory

CachingConnectionFactory 扩展了 SingleConnectionFactory 的功能,并添加了 SessionMessageProducerMessageConsumer 实例的缓存。初始缓存大小设置为 1。您可以使用 sessionCacheSize 属性来增加缓存会话的数量。请注意,实际缓存的会话数量多于该数字,因为会话是根据其确认模式进行缓存的,因此当 sessionCacheSize 设置为一时,最多可以有四个缓存会话实例(每种确认模式一个)。MessageProducerMessageConsumer 实例在其拥有的会话中进行缓存,并且在缓存时也会考虑生产者和消费者的独特属性。MessageProducer 根据其目标进行缓存。MessageConsumer 根据由目标、选择器、noLocal 传递标志和持久订阅名称(如果创建持久消费者)组成的键进行缓存。

临时队列和主题 (TemporaryQueue/TemporaryTopic) 的 MessageProducers 和 MessageConsumers 永远不会被缓存。不幸的是,WebLogic JMS 碰巧在其常规目标实现上实现了临时队列/主题接口,错误地指示其所有目标都无法缓存。请在 WebLogic 上使用不同的连接池/缓存,或为 WebLogic 目的自定义 CachingConnectionFactory

目标管理

目标,作为 ConnectionFactory 实例,是 JMS 管理对象,您可以将其存储和检索在 JNDI 中。配置 Spring 应用程序上下文时,您可以使用 JNDI JndiObjectFactoryBean 工厂类或 <jee:jndi-lookup> 对对象的 JMS 目标引用执行依赖注入。然而,如果应用程序中存在大量目标,或者 JMS 提供商具有独特的高级目标管理功能,则此策略通常很麻烦。此类高级目标管理的示例包括创建动态目标或支持分层目标命名空间。JmsTemplate 将目标名称的解析委托给实现 DestinationResolver 接口的 JMS 目标对象。DynamicDestinationResolverJmsTemplate 使用的默认实现,并适应解析动态目标。还提供了 JndiDestinationResolver,用作 JNDI 中包含的目标的服务定位器,并可选地回退到 DynamicDestinationResolver 中包含的行为。

通常,JMS 应用程序中使用的目标仅在运行时已知,因此在应用程序部署时无法管理性地创建。这通常是因为交互系统组件之间存在共享应用程序逻辑,这些组件根据众所周知的命名约定在运行时创建目标。尽管动态目标的创建不是 JMS 规范的一部分,但大多数供应商都提供了此功能。动态目标使用用户定义的名称创建,这使其与临时目标区分开来,并且通常未在 JNDI 中注册。用于创建动态目标的 API 因提供商而异,因为与目标关联的属性是供应商特定的。然而,供应商有时会做出一个简单的实现选择,即忽略 JMS 规范中的警告,并使用 TopicSession createTopic(String topicName) 方法或 QueueSession createQueue(String queueName) 方法创建一个具有默认目标属性的新目标。根据供应商实现,DynamicDestinationResolver 也可以创建物理目标,而不仅仅是解析一个。

布尔属性 pubSubDomain 用于配置 JmsTemplate,使其了解正在使用的 JMS 域。默认情况下,此属性的值为 false,表示将使用点对点域 Queues。此属性(由 JmsTemplate 使用)通过 DestinationResolver 接口的实现确定动态目标解析的行为。

您还可以通过属性 defaultDestinationJmsTemplate 配置默认目标。默认目标用于不引用特定目标的发送和接收操作。

消息监听器容器

在 EJB 世界中,JMS 消息最常见的用途之一是驱动消息驱动 Bean (MDB)。Spring 提供了一种创建消息驱动 POJO (MDP) 的解决方案,这种方式不会将用户绑定到 EJB 容器。(有关 Spring MDP 支持的详细介绍,请参阅 异步接收:消息驱动 POJO。)端点方法可以用 @JmsListener 进行注解——有关更多详细信息,请参阅 注解驱动的监听器端点

消息监听器容器用于从 JMS 消息队列接收消息并驱动注入其中的 MessageListener。监听器容器负责所有消息接收的线程化并将消息分派到监听器进行处理。消息监听器容器是 MDP 和消息提供商之间的中介,负责注册以接收消息、参与事务、资源获取和释放、异常转换等等。这使您可以编写与接收消息(并可能对其进行响应)相关的(可能复杂的)业务逻辑,并将样板 JMS 基础设施问题委托给框架。

Spring 提供了两个标准的 JMS 消息监听器容器,每个容器都有其专门的功能集。

使用 SimpleMessageListenerContainer

此消息监听器容器是两种标准类型中较简单的一种。它在启动时创建固定数量的 JMS 会话和消费者,使用标准的 JMS MessageConsumer.setMessageListener() 方法注册监听器,并由 JMS 提供商执行监听器回调。此变体不允许动态适应运行时需求或参与外部管理的事务。从兼容性角度来看,它非常接近独立 JMS 规范的精神,但通常与 Jakarta EE 的 JMS 限制不兼容。

虽然 SimpleMessageListenerContainer 不允许参与外部管理的事务,但它确实支持原生 JMS 事务。要启用此功能,您可以将 sessionTransacted 标志切换为 true,或者在 XML 命名空间中,将 acknowledge 属性设置为 transacted。从您的监听器抛出的异常将导致回滚,消息将被重新传递。或者,考虑使用 CLIENT_ACKNOWLEDGE 模式,它在发生异常时也提供重新传递,但它不使用事务性 Session 实例,因此不包括事务协议中的任何其他 Session 操作(例如发送响应消息)。

默认的 AUTO_ACKNOWLEDGE 模式不提供适当的可靠性保证。当监听器执行失败(因为提供商在监听器调用后自动确认每条消息,没有异常传播到提供商)或监听器容器关闭(您可以通过设置 acceptMessagesWhileStopping 标志来配置此项)时,消息可能会丢失。在需要可靠性(例如,用于可靠的队列处理和持久主题订阅)的情况下,请确保使用事务性会话。

使用 DefaultMessageListenerContainer

此消息监听器容器在大多数情况下使用。与 SimpleMessageListenerContainer 相比,此容器变体允许动态适应运行时需求,并且能够参与外部管理的事务。当配置了 JtaTransactionManager 时,每条收到的消息都用 XA 事务注册。因此,处理可以利用 XA 事务语义。此监听器容器在对 JMS 提供商的低要求、高级功能(例如参与外部管理的事务)以及与 Jakarta EE 环境的兼容性之间取得了良好的平衡。

您可以自定义容器的缓存级别。请注意,当未启用缓存时,每次消息接收都会创建一个新的连接和新的会话。将其与高负载下的非持久订阅结合使用可能会导致消息丢失。在这种情况下,请确保使用适当的缓存级别。

当代理宕机时,此容器还具有可恢复功能。默认情况下,一个简单的 BackOff 实现每五秒重试一次。您可以指定一个自定义的 BackOff 实现,以获得更细粒度的恢复选项。有关示例,请参阅 ExponentialBackOff

与其姊妹容器 (SimpleMessageListenerContainer) 一样,DefaultMessageListenerContainer 支持原生 JMS 事务并允许自定义确认模式。如果您的场景可行,强烈建议使用它而不是外部管理的事务——也就是说,如果您可以接受 JVM 崩溃时偶尔出现重复消息的情况。您的业务逻辑中的自定义重复消息检测步骤可以涵盖这种情况——例如,以业务实体存在检查或协议表检查的形式。任何此类安排都比替代方案效率高得多:用 XA 事务包装您的整个处理(通过使用 JtaTransactionManager 配置您的 DefaultMessageListenerContainer)以涵盖 JMS 消息的接收以及消息监听器中业务逻辑的执行(包括数据库操作等)。

默认的 AUTO_ACKNOWLEDGE 模式不提供适当的可靠性保证。当监听器执行失败(因为提供商在监听器调用后自动确认每条消息,没有异常传播到提供商)或监听器容器关闭(您可以通过设置 acceptMessagesWhileStopping 标志来配置此项)时,消息可能会丢失。在需要可靠性(例如,用于可靠的队列处理和持久主题订阅)的情况下,请确保使用事务性会话。

事务管理

Spring 提供了 JmsTransactionManager,它管理单个 JMS ConnectionFactory 的事务。这使得 JMS 应用程序能够利用 Spring 的托管事务功能,如 数据访问章节的事务管理部分 所述。JmsTransactionManager 执行本地资源事务,将指定 ConnectionFactory 的 JMS 连接/会话对绑定到线程。JmsTemplate 自动检测此类事务资源并相应地操作它们。

在 Jakarta EE 环境中,ConnectionFactory 池化 Connection 和 Session 实例,因此这些资源在事务中得到高效重用。在独立环境中,使用 Spring 的 SingleConnectionFactory 会导致共享 JMS Connection,每个事务都有自己的独立 Session。或者,考虑使用供应商特定的池适配器,例如 ActiveMQ 的 PooledConnectionFactory 类。

您还可以将 JmsTemplateJtaTransactionManager 和支持 XA 的 JMS ConnectionFactory 一起使用以执行分布式事务。请注意,这需要使用 JTA 事务管理器以及正确配置 XA 的 ConnectionFactory。(请查阅您的 Jakarta EE 服务器或 JMS 提供商的文档。)

在使用 JMS API 从 Connection 创建 Session 时,在托管和非托管事务环境之间重用代码可能会令人困惑。这是因为 JMS API 只有一个工厂方法来创建 Session,并且它需要事务和确认模式的值。在托管环境中,设置这些值是环境事务基础设施的责任,因此这些值被供应商对 JMS Connection 的包装器忽略。当您在非托管环境中使用 JmsTemplate 时,您可以通过使用属性 sessionTransactedsessionAcknowledgeMode 来指定这些值。当您将 PlatformTransactionManagerJmsTemplate 一起使用时,模板总是获得一个事务性 JMS Session