消息转换器

AmqpTemplate 还定义了几个发送和接收消息的方法,这些方法委托给 MessageConverterMessageConverter 为每个方向提供一个方法:一个用于 转换为 Message,另一个用于 Message 转换。 请注意,当转换为 Message 时,除了对象之外,您还可以提供属性。 object 参数通常对应于消息体。 以下清单显示了 MessageConverter 接口的定义:

public interface MessageConverter {

    Message toMessage(Object object, MessageProperties messageProperties)
            throws MessageConversionException;

    Object fromMessage(Message message) throws MessageConversionException;

}

AmqpTemplate 上相关的消息发送方法比我们之前讨论的方法更简单,因为它们不需要 Message 实例。 相反,MessageConverter 负责通过将提供的对象转换为 Message 主体的字节数组,然后添加任何提供的 MessageProperties 来“创建”每个 Message。 以下清单显示了各种方法的定义:

void convertAndSend(Object message) throws AmqpException;

void convertAndSend(String routingKey, Object message) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message)
    throws AmqpException;

void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
    throws AmqpException;

void convertAndSend(String routingKey, Object message,
    MessagePostProcessor messagePostProcessor) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message,
    MessagePostProcessor messagePostProcessor) throws AmqpException;

在接收端,只有两个方法:一个接受队列名称,另一个依赖于模板的“queue”属性已被设置。 以下清单显示了这两个方法的定义:

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

异步消费者 中提到的 MessageListenerAdapter 也使用 MessageConverter

SimpleMessageConverter

MessageConverter 策略的默认实现称为 SimpleMessageConverter。 如果您没有显式配置替代方案,RabbitTemplate 实例将使用此转换器。 它处理基于文本的内容、序列化的 Java 对象和字节数组。

Message 转换

如果输入 Message 的内容类型以 "text" 开头(例如, "text/plain"),它还会检查 content-encoding 属性,以确定将 Message 正文字节数组转换为 Java String 时使用的字符集。 如果输入 Message 上未设置 content-encoding 属性,则默认使用 UTF-8 字符集。 如果您需要覆盖该默认设置,可以配置 SimpleMessageConverter 实例,设置其 defaultCharset 属性,并将其注入 RabbitTemplate 实例。

如果输入 Message 的 content-type 属性值设置为 "application/x-java-serialized-object",SimpleMessageConverter 会尝试将字节数组反序列化(重新水合)为 Java 对象。 虽然这对于简单的原型设计可能很有用,但我们不建议依赖 Java 序列化,因为它会导致生产者和消费者之间的紧密耦合。 当然,它也排除了在任何一方使用非 Java 系统的可能性。 由于 AMQP 是一个线级协议,如果因这些限制而失去大部分优势,那将是不幸的。 在接下来的两节中,我们将探讨在不依赖 Java 序列化的情况下传递丰富领域对象内容的一些替代方案。

对于所有其他内容类型,SimpleMessageConverter 直接将 Message 正文内容作为字节数组返回。

请参阅 Java 反序列化 以获取重要信息。

转换为 Message

当从任意 Java 对象转换为 Message 时,SimpleMessageConverter 同样处理字节数组、字符串和可序列化实例。 它将这些类型转换为字节(对于字节数组,无需转换),并相应地设置 content-type 属性。 如果要转换的 Object 不匹配这些类型之一,则 Message 正文将为 null。

SerializerMessageConverter

此转换器类似于 SimpleMessageConverter,不同之处在于它可以配置其他 Spring Framework SerializerDeserializer 实现,用于 application/x-java-serialized-object 转换。

请参阅 Java 反序列化 以获取重要信息。

Jackson2JsonMessageConverter

本节介绍如何使用 Jackson2JsonMessageConverterMessage 之间进行转换。 它包含以下部分:

转换为 Message

如前一节所述,通常不建议依赖 Java 序列化。 一种更灵活且可在不同语言和平台之间移植的常见替代方案是 JSON (JavaScript Object Notation)。 转换器可以在任何 RabbitTemplate 实例上配置,以覆盖其对 SimpleMessageConverter 默认值的用法。 Jackson2JsonMessageConverter 使用 com.fasterxml.jackson 2.x 库。 以下示例配置 Jackson2JsonMessageConverter

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    <property name="messageConverter">
        <bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
            <!-- if necessary, override the DefaultClassMapper -->
            <property name="classMapper" ref="customClassMapper"/>
        </bean>
    </property>
</bean>

如上所示,Jackson2JsonMessageConverter 默认使用 DefaultClassMapper。 类型信息被添加到 MessageProperties 中(并从中检索)。 如果入站消息在 MessageProperties 中不包含类型信息,但您知道预期类型,您 可以通过使用 defaultType 属性配置静态类型,如以下示例所示:

<bean id="jsonConverterWithDefaultType"
      class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
    <property name="classMapper">
        <bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
            <property name="defaultType" value="thing1.PurchaseOrder"/>
        </bean>
    </property>
</bean>

此外,您可以提供 TypeId 头部中值的自定义映射。 以下示例显示了如何执行此操作:

@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
    Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
    jsonConverter.setClassMapper(classMapper());
    return jsonConverter;
}

@Bean
public DefaultClassMapper classMapper() {
    DefaultClassMapper classMapper = new DefaultClassMapper();
    Map<String, Class<?>> idClassMapping = new HashMap<>();
    idClassMapping.put("thing1", Thing1.class);
    idClassMapping.put("thing2", Thing2.class);
    classMapper.setIdClassMapping(idClassMapping);
    return classMapper;
}

现在,如果发送系统将头部设置为 thing1,转换器将创建 Thing1 对象,依此类推。 有关从非 Spring 应用程序转换消息的完整讨论,请参阅 从非 Spring 应用程序接收 JSON 示例应用程序。

从版本 2.4.3 开始,如果 supportedMediaType 具有 charset 参数,转换器将不会添加 contentEncoding 消息属性;这也用于编码。 已添加新方法 setSupportedMediaType

String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));

Message 转换

入站消息根据发送系统添加到头部中的类型信息转换为对象。

从版本 2.4.3 开始,如果不存在 contentEncoding 消息属性,转换器将尝试检测 contentType 消息属性中的 charset 参数并使用它。 如果两者都不存在,如果 supportedMediaType 具有 charset 参数,它将用于解码,并最终回退到 defaultCharset 属性。 已添加新方法 setSupportedMediaType

String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));

在 1.6 版本之前,如果不存在类型信息,转换将失败。 从 1.6 版本开始,如果缺少类型信息,转换器将使用 Jackson 默认值(通常是映射)转换 JSON。

此外,从 1.6 版本开始,当您使用 @RabbitListener 注解(在方法上)时,推断的类型信息会添加到 MessageProperties 中。 这使得转换器可以转换为目标方法的参数类型。 这仅适用于只有一个没有注解的参数或一个带有 @Payload 注解的参数。 类型为 Message 的参数在分析期间被忽略。

默认情况下,推断的类型信息将覆盖发送系统创建的入站 TypeId 和相关头部。 这使得接收系统可以自动转换为不同的领域对象。 这仅适用于参数类型是具体(非抽象或接口)或来自 java.util 包的情况。 在所有其他情况下,将使用 TypeId 和相关头部。 在某些情况下,您可能希望覆盖默认行为并始终使用 TypeId 信息。 例如,假设您有一个接受 Thing1 参数但消息包含 Thing2(它是 Thing1 的子类,它是具体的)的 @RabbitListener。 推断的类型将不正确。 要处理这种情况,请将 Jackson2JsonMessageConverter 上的 TypePrecedence 属性设置为 TYPE_ID 而不是默认的 INFERRED。 (该属性实际上在转换器的 DefaultJackson2JavaTypeMapper 上,但转换器上提供了 setter 以方便使用。) 如果您注入自定义类型映射器,则应在映射器上设置该属性。

Message 转换时,传入的 MessageProperties.getContentType() 必须符合 JSON 规范(使用 contentType.contains("json") 进行检查)。 从 2.2 版本开始,如果不存在 contentType 属性,或者它具有默认值 application/octet-stream,则假定为 application/json。 要恢复到以前的行为(返回未转换的 byte[]),请将转换器的 assumeSupportedContentType 属性设置为 false。 如果内容类型不受支持,则会发出 WARN 级别的日志消息 Could not convert incoming message with content-type […​],并按原样返回 message.getBody() — 作为 byte[]。 因此,为了满足消费者端 Jackson2JsonMessageConverter 的要求,生产者必须添加 contentType 消息属性 — 例如,作为 application/jsontext/x-json,或使用 Jackson2JsonMessageConverter,它会自动设置头部。 以下清单显示了许多转换器调用:

@RabbitListener
public void thing1(Thing1 thing1) {...}

@RabbitListener
public void thing1(@Payload Thing1 thing1, @Header("amqp_consumerQueue") String queue) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.amqp.core.Message message) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<Foo> message) {...}

@RabbitListener
public void thing1(Thing1 thing1, String bar) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<?> message) {...}

在前面清单中的前四种情况下,转换器尝试转换为 Thing1 类型。 第五个示例无效,因为我们无法确定哪个参数应该接收消息负载。 对于第六个示例,由于泛型类型是 WildcardType,因此应用 Jackson 默认值。

但是,您可以创建自定义转换器并使用 targetMethod 消息属性来决定将 JSON 转换为哪种类型。

这种类型推断只能在方法级别声明 @RabbitListener 注解时实现。 对于类级别的 @RabbitListener,转换后的类型用于选择要调用的 @RabbitHandler 方法。 因此,基础设施提供了 targetObject 消息属性,您可以在自定义转换器中使用它来确定类型。

从版本 1.6.11 开始,Jackson2JsonMessageConverter 以及 DefaultJackson2JavaTypeMapper (DefaultClassMapper) 提供了 trustedPackages 选项,以克服 序列化小工具 漏洞。 默认情况下,为了向后兼容,Jackson2JsonMessageConverter 信任所有包 — 也就是说,它使用 * 作为选项。

从版本 2.4.7 开始,可以将转换器配置为在 Jackson 反序列化消息体后返回 Optional.empty()(如果 Jackson 返回 null)。 这使得 @RabbitListener 可以通过两种方式接收空负载:

@RabbitListener(queues = "op.1")
void listen(@Payload(required = false) Thing payload) {
    handleOptional(payload); // payload might be null
}

@RabbitListener(queues = "op.2")
void listen(Optional<Thing> optional) {
    handleOptional(optional.orElse(this.emptyThing));
}

要启用此功能,请将 setNullAsOptionalEmpty 设置为 true;当为 false(默认值)时,转换器将回退到原始消息体 (byte[])。

@Bean
Jackson2JsonMessageConverter converter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    converter.setNullAsOptionalEmpty(true);
    return converter;
}

反序列化抽象类

在 2.2.8 版本之前,如果 @RabbitListener 的推断类型是抽象类(包括接口),转换器将回退到在头部中查找类型信息,如果存在,则使用该信息;如果不存在,它将尝试创建抽象类。 当使用配置了自定义反序列化器来处理抽象类的自定义 ObjectMapper,但传入消息具有无效类型头部时,这会导致问题。

从 2.2.8 版本开始,默认保留以前的行为。如果您有这样的自定义 ObjectMapper 并且您希望忽略类型头部,并始终使用推断类型进行转换,请将 alwaysConvertToInferredType 设置为 true。 这是为了向后兼容并避免在转换失败(使用标准 ObjectMapper)时尝试转换的开销。

使用 Spring Data Projection 接口

从版本 2.2 开始,您可以将 JSON 转换为 Spring Data Projection 接口而不是具体类型。 这允许对数据进行非常选择性且低耦合的绑定,包括从 JSON 文档中的多个位置查找值。 例如,以下接口可以定义为消息负载类型:

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@RabbitListener(queues = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

默认情况下,访问器方法将用于查找接收到的 JSON 文档中作为字段的属性名称。 @JsonPath 表达式允许自定义值查找,甚至定义多个 JSON 路径表达式,从多个位置查找值,直到表达式返回实际值。

要启用此功能,请将消息转换器上的 useProjectionForInterfaces 设置为 true。 您还必须将 spring-data:spring-data-commonscom.jayway.jsonpath:json-path 添加到类路径中。

当用作 @RabbitListener 方法的参数时,接口类型会自动像往常一样传递给转换器。

使用 RabbitTemplateMessage 转换

如前所述,类型信息在消息头部中传递,以帮助转换器从消息转换。 这在大多数情况下都有效。 但是,当使用泛型类型时,它只能转换简单对象和已知的“容器”对象(列表、数组和映射)。 从 2.0 版本开始,Jackson2JsonMessageConverter 实现了 SmartMessageConverter,这使得它可以通过接受 ParameterizedTypeReference 参数的新 RabbitTemplate 方法使用。 这允许转换复杂的泛型类型,如以下示例所示:

Thing1<Thing2<Cat, Hat>> thing1 =
    rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<Thing1<Thing2<Cat, Hat>>>() { });

从版本 2.1 开始,AbstractJsonMessageConverter 类已被删除。 它不再是 Jackson2JsonMessageConverter 的基类。 它已被 AbstractJackson2MessageConverter 取代。

MarshallingMessageConverter

另一种选择是 MarshallingMessageConverter。 它委托给 Spring OXM 库的 MarshallerUnmarshaller 策略接口实现。 您可以在 {spring-framework-docs}/data-access/oxm.html[此处] 阅读有关该库的更多信息。 在配置方面,最常见的是只提供构造函数参数,因为大多数 Marshaller 实现也实现了 Unmarshaller。 以下示例显示了如何配置 MarshallingMessageConverter

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    <property name="messageConverter">
        <bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
            <constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
        </bean>
    </property>
</bean>

Jackson2XmlMessageConverter

此类别在 2.1 版中引入,可用于在 XML 之间转换消息。

Jackson2XmlMessageConverterJackson2JsonMessageConverter 具有相同的基类:AbstractJackson2MessageConverter

引入 AbstractJackson2MessageConverter 类是为了替换已删除的类:AbstractJsonMessageConverter

Jackson2XmlMessageConverter 使用 com.fasterxml.jackson 2.x 库。

您可以像使用 Jackson2JsonMessageConverter 一样使用它,只是它支持 XML 而不是 JSON。 以下示例配置 Jackson2JsonMessageConverter

<bean id="xmlConverterWithDefaultType"
        class="org.springframework.amqp.support.converter.Jackson2XmlMessageConverter">
    <property name="classMapper">
        <bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
            <property name="defaultType" value="foo.PurchaseOrder"/>
        </bean>
    </property>
</bean>

有关更多信息,请参阅 Jackson2JsonMessageConverter

从版本 2.2 开始,如果不存在 contentType 属性,或者它具有默认值 application/octet-stream,则假定为 application/xml。 要恢复到以前的行为(返回未转换的 byte[]),请将转换器的 assumeSupportedContentType 属性设置为 false

ContentTypeDelegatingMessageConverter

此类别在 1.4.2 版中引入,允许根据 MessageProperties 中的内容类型属性委托给特定的 MessageConverter。 默认情况下,如果不存在 contentType 属性或存在与任何已配置转换器都不匹配的值,它会委托给 SimpleMessageConverter。 以下示例配置 ContentTypeDelegatingMessageConverter

<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
    <property name="delegates">
        <map>
            <entry key="application/json" value-ref="jsonMessageConverter" />
            <entry key="application/xml" value-ref="xmlMessageConverter" />
        </map>
    </property>
</bean>

Java 反序列化

本节介绍如何反序列化 Java 对象。

从不受信任的来源反序列化 Java 对象时可能存在漏洞。 如果您接受来自不受信任的来源且 content-typeapplication/x-java-serialized-object 的消息,则应 考虑配置允许反序列化哪些包和类。 这适用于 SimpleMessageConverterSerializerMessageConverter,当它们隐式或通过配置使用 DefaultDeserializer 时。 默认情况下,允许列表为空,这意味着不会反序列化任何类。 您可以设置模式列表,例如 thing1.thing1.thing2.Cat.MySafeClass。 模式将按顺序检查,直到找到匹配项。 如果未找到匹配项,则抛出 SecurityException。 您可以使用这些转换器上的 allowedListPatterns 属性设置模式。 或者,如果您信任所有消息发起者,可以将环境变量 SPRING_AMQP_DESERIALIZATION_TRUST_ALL 或系统属性 spring.amqp.deserialization.trust.all 设置为 true

消息属性转换器

MessagePropertiesConverter 策略接口用于在 Rabbit 客户端 BasicProperties 和 Spring AMQP MessageProperties 之间进行转换。 默认实现 (DefaultMessagePropertiesConverter) 通常足以满足大多数目的,但如果需要,您可以实现自己的。 默认属性转换器将 BasicProperties 类型为 LongString 的元素转换为 String 实例,当大小不超过 1024 字节时。 较大的 LongString 实例不进行转换(参见下一段)。 此限制可以用构造函数参数覆盖。

从 1.6 版本开始,长度超过长字符串限制(默认值:1024)的头部现在默认由 DefaultMessagePropertiesConverter 保留为 LongString 实例。 您可以通过 getBytes[]toString()getStream() 方法访问内容。

以前,DefaultMessagePropertiesConverter 将此类头部“转换”为 DataInputStream(实际上它只是引用了 LongString 实例的 DataInputStream)。 在输出时,此头部未转换(除了转换为字符串 — 例如,java.io.DataInputStream@1d057a39,通过调用流上的 toString())。

现在,大型传入的 LongString 头部在输出时也正确地“转换”(默认情况下)。

提供了一个新的构造函数,允许您配置转换器以像以前一样工作。 以下清单显示了方法的 Javadoc 注释和声明:

/**
 * Construct an instance where LongStrings will be returned
 * unconverted or as a java.io.DataInputStream when longer than this limit.
 * Use this constructor with 'true' to restore pre-1.6 behavior.
 * @param longStringLimit the limit.
 * @param convertLongLongStrings LongString when false,
 * DataInputStream when true.
 * @since 1.6
 */
public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }

同样从 1.6 版本开始,correlationIdString 这一新属性已添加到 MessageProperties 中。 以前,当在 RabbitMQ 客户端使用的 BasicProperties 之间进行转换时,会执行不必要的 byte[] <→ String 转换,因为 MessageProperties.correlationIdbyte[],但 BasicProperties 使用 String。 (最终,RabbitMQ 客户端使用 UTF-8 将 String 转换为字节,以放入协议消息中)。

为了提供最大的向后兼容性,correlationIdPolicy 这一新属性已添加到 DefaultMessagePropertiesConverter 中。 它接受 DefaultMessagePropertiesConverter.CorrelationIdPolicy 枚举参数。 默认情况下,它设置为 BYTES,这复制了以前的行为。

对于入站消息:

  • STRING:仅映射 correlationIdString 属性

  • BYTES:仅映射 correlationId 属性

  • BOTH:同时映射这两个属性

对于出站消息:

  • STRING:仅映射 correlationIdString 属性

  • BYTES:仅映射 correlationId 属性

  • BOTH:同时考虑这两个属性,其中 String 属性优先

同样从 1.6 版本开始,入站 deliveryMode 属性不再映射到 MessageProperties.deliveryMode。 它改为映射到 MessageProperties.receivedDeliveryMode。 此外,入站 userId 属性不再映射到 MessageProperties.userId。 它改为映射到 MessageProperties.receivedUserId。 这些更改是为了避免如果同一个 MessageProperties 对象用于出站消息时这些属性的意外传播。

从 2.2 版本开始,DefaultMessagePropertiesConverter 使用 getName() 而不是 toString() 转换任何值为 Class<?> 类型的自定义头部;这避免了消费应用程序必须从 toString() 表示中解析类名。 对于滚动升级,您可能需要更改消费者以理解这两种格式,直到所有生产者都升级。