消息转换器
AmqpTemplate 还定义了几个发送和接收消息的方法,这些方法委托给 MessageConverter。
MessageConverter 为每个方向提供一个方法:一个用于 转换为 Message,另一个用于 从 Message 转换。
请注意,当转换为 Message 时,除了对象之外,您还可以提供属性。
object 参数通常对应于消息体。
以下清单显示了 MessageConverter 接口的定义:
public interface MessageConverter {
Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException;
Object fromMessage(Message message) throws MessageConversionException;
}
AmqpTemplate 上相关的消息发送方法比我们之前讨论的方法更简单,因为它们不需要 Message 实例。
相反,MessageConverter 负责通过将提供的对象转换为 Message 主体的字节数组,然后添加任何提供的 MessageProperties 来“创建”每个 Message。
以下清单显示了各种方法的定义:
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message)
throws AmqpException;
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
在接收端,只有两个方法:一个接受队列名称,另一个依赖于模板的“queue”属性已被设置。
以下清单显示了这两个方法的定义:
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
|
异步消费者 中提到的 |
SimpleMessageConverter
MessageConverter 策略的默认实现称为 SimpleMessageConverter。
如果您没有显式配置替代方案,RabbitTemplate 实例将使用此转换器。
它处理基于文本的内容、序列化的 Java 对象和字节数组。
从 Message 转换
如果输入 Message 的内容类型以 "text" 开头(例如,
"text/plain"),它还会检查 content-encoding 属性,以确定将 Message 正文字节数组转换为 Java String 时使用的字符集。
如果输入 Message 上未设置 content-encoding 属性,则默认使用 UTF-8 字符集。
如果您需要覆盖该默认设置,可以配置 SimpleMessageConverter 实例,设置其 defaultCharset 属性,并将其注入 RabbitTemplate 实例。
如果输入 Message 的 content-type 属性值设置为 "application/x-java-serialized-object",SimpleMessageConverter 会尝试将字节数组反序列化(重新水合)为 Java 对象。
虽然这对于简单的原型设计可能很有用,但我们不建议依赖 Java 序列化,因为它会导致生产者和消费者之间的紧密耦合。
当然,它也排除了在任何一方使用非 Java 系统的可能性。
由于 AMQP 是一个线级协议,如果因这些限制而失去大部分优势,那将是不幸的。
在接下来的两节中,我们将探讨在不依赖 Java 序列化的情况下传递丰富领域对象内容的一些替代方案。
对于所有其他内容类型,SimpleMessageConverter 直接将 Message 正文内容作为字节数组返回。
请参阅 Java 反序列化 以获取重要信息。
SerializerMessageConverter
此转换器类似于 SimpleMessageConverter,不同之处在于它可以配置其他 Spring Framework
Serializer 和 Deserializer 实现,用于 application/x-java-serialized-object 转换。
请参阅 Java 反序列化 以获取重要信息。
Jackson2JsonMessageConverter
本节介绍如何使用 Jackson2JsonMessageConverter 在 Message 之间进行转换。
它包含以下部分:
转换为 Message
如前一节所述,通常不建议依赖 Java 序列化。
一种更灵活且可在不同语言和平台之间移植的常见替代方案是 JSON
(JavaScript Object Notation)。
转换器可以在任何 RabbitTemplate 实例上配置,以覆盖其对 SimpleMessageConverter
默认值的用法。
Jackson2JsonMessageConverter 使用 com.fasterxml.jackson 2.x 库。
以下示例配置 Jackson2JsonMessageConverter:
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
<!-- if necessary, override the DefaultClassMapper -->
<property name="classMapper" ref="customClassMapper"/>
</bean>
</property>
</bean>
如上所示,Jackson2JsonMessageConverter 默认使用 DefaultClassMapper。
类型信息被添加到 MessageProperties 中(并从中检索)。
如果入站消息在 MessageProperties 中不包含类型信息,但您知道预期类型,您
可以通过使用 defaultType 属性配置静态类型,如以下示例所示:
<bean id="jsonConverterWithDefaultType"
class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="thing1.PurchaseOrder"/>
</bean>
</property>
</bean>
此外,您可以提供 TypeId 头部中值的自定义映射。
以下示例显示了如何执行此操作:
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("thing1", Thing1.class);
idClassMapping.put("thing2", Thing2.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}
现在,如果发送系统将头部设置为 thing1,转换器将创建 Thing1 对象,依此类推。
有关从非 Spring 应用程序转换消息的完整讨论,请参阅 从非 Spring 应用程序接收 JSON 示例应用程序。
从版本 2.4.3 开始,如果 supportedMediaType 具有 charset 参数,转换器将不会添加 contentEncoding 消息属性;这也用于编码。
已添加新方法 setSupportedMediaType:
String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));
从 Message 转换
入站消息根据发送系统添加到头部中的类型信息转换为对象。
从版本 2.4.3 开始,如果不存在 contentEncoding 消息属性,转换器将尝试检测 contentType 消息属性中的 charset 参数并使用它。
如果两者都不存在,如果 supportedMediaType 具有 charset 参数,它将用于解码,并最终回退到 defaultCharset 属性。
已添加新方法 setSupportedMediaType:
String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));
在 1.6 版本之前,如果不存在类型信息,转换将失败。 从 1.6 版本开始,如果缺少类型信息,转换器将使用 Jackson 默认值(通常是映射)转换 JSON。
此外,从 1.6 版本开始,当您使用 @RabbitListener 注解(在方法上)时,推断的类型信息会添加到 MessageProperties 中。
这使得转换器可以转换为目标方法的参数类型。
这仅适用于只有一个没有注解的参数或一个带有 @Payload 注解的参数。
类型为 Message 的参数在分析期间被忽略。
默认情况下,推断的类型信息将覆盖发送系统创建的入站 TypeId 和相关头部。
这使得接收系统可以自动转换为不同的领域对象。
这仅适用于参数类型是具体(非抽象或接口)或来自 java.util
包的情况。
在所有其他情况下,将使用 TypeId 和相关头部。
在某些情况下,您可能希望覆盖默认行为并始终使用 TypeId 信息。
例如,假设您有一个接受 Thing1 参数但消息包含 Thing2(它是 Thing1 的子类,它是具体的)的 @RabbitListener。
推断的类型将不正确。
要处理这种情况,请将 Jackson2JsonMessageConverter 上的 TypePrecedence 属性设置为 TYPE_ID 而不是默认的 INFERRED。
(该属性实际上在转换器的 DefaultJackson2JavaTypeMapper 上,但转换器上提供了 setter 以方便使用。)
如果您注入自定义类型映射器,则应在映射器上设置该属性。
|
从 |
@RabbitListener
public void thing1(Thing1 thing1) {...}
@RabbitListener
public void thing1(@Payload Thing1 thing1, @Header("amqp_consumerQueue") String queue) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.amqp.core.Message message) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<Foo> message) {...}
@RabbitListener
public void thing1(Thing1 thing1, String bar) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<?> message) {...}
在前面清单中的前四种情况下,转换器尝试转换为 Thing1 类型。
第五个示例无效,因为我们无法确定哪个参数应该接收消息负载。
对于第六个示例,由于泛型类型是 WildcardType,因此应用 Jackson 默认值。
但是,您可以创建自定义转换器并使用 targetMethod 消息属性来决定将 JSON 转换为哪种类型。
|
这种类型推断只能在方法级别声明 |
从版本 1.6.11 开始,Jackson2JsonMessageConverter 以及 DefaultJackson2JavaTypeMapper (DefaultClassMapper) 提供了 trustedPackages 选项,以克服 序列化小工具 漏洞。
默认情况下,为了向后兼容,Jackson2JsonMessageConverter 信任所有包 — 也就是说,它使用 * 作为选项。
从版本 2.4.7 开始,可以将转换器配置为在 Jackson 反序列化消息体后返回 Optional.empty()(如果 Jackson 返回 null)。
这使得 @RabbitListener 可以通过两种方式接收空负载:
@RabbitListener(queues = "op.1")
void listen(@Payload(required = false) Thing payload) {
handleOptional(payload); // payload might be null
}
@RabbitListener(queues = "op.2")
void listen(Optional<Thing> optional) {
handleOptional(optional.orElse(this.emptyThing));
}
要启用此功能,请将 setNullAsOptionalEmpty 设置为 true;当为 false(默认值)时,转换器将回退到原始消息体 (byte[])。
@Bean
Jackson2JsonMessageConverter converter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setNullAsOptionalEmpty(true);
return converter;
}
反序列化抽象类
在 2.2.8 版本之前,如果 @RabbitListener 的推断类型是抽象类(包括接口),转换器将回退到在头部中查找类型信息,如果存在,则使用该信息;如果不存在,它将尝试创建抽象类。
当使用配置了自定义反序列化器来处理抽象类的自定义 ObjectMapper,但传入消息具有无效类型头部时,这会导致问题。
从 2.2.8 版本开始,默认保留以前的行为。如果您有这样的自定义 ObjectMapper 并且您希望忽略类型头部,并始终使用推断类型进行转换,请将 alwaysConvertToInferredType 设置为 true。
这是为了向后兼容并避免在转换失败(使用标准 ObjectMapper)时尝试转换的开销。
使用 Spring Data Projection 接口
从版本 2.2 开始,您可以将 JSON 转换为 Spring Data Projection 接口而不是具体类型。 这允许对数据进行非常选择性且低耦合的绑定,包括从 JSON 文档中的多个位置查找值。 例如,以下接口可以定义为消息负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@RabbitListener(queues = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
默认情况下,访问器方法将用于查找接收到的 JSON 文档中作为字段的属性名称。
@JsonPath 表达式允许自定义值查找,甚至定义多个 JSON 路径表达式,从多个位置查找值,直到表达式返回实际值。
要启用此功能,请将消息转换器上的 useProjectionForInterfaces 设置为 true。
您还必须将 spring-data:spring-data-commons 和 com.jayway.jsonpath:json-path 添加到类路径中。
当用作 @RabbitListener 方法的参数时,接口类型会自动像往常一样传递给转换器。
使用 RabbitTemplate 从 Message 转换
如前所述,类型信息在消息头部中传递,以帮助转换器从消息转换。
这在大多数情况下都有效。
但是,当使用泛型类型时,它只能转换简单对象和已知的“容器”对象(列表、数组和映射)。
从 2.0 版本开始,Jackson2JsonMessageConverter 实现了 SmartMessageConverter,这使得它可以通过接受 ParameterizedTypeReference 参数的新 RabbitTemplate 方法使用。
这允许转换复杂的泛型类型,如以下示例所示:
Thing1<Thing2<Cat, Hat>> thing1 =
rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<Thing1<Thing2<Cat, Hat>>>() { });
|
从版本 2.1 开始, |
MarshallingMessageConverter
另一种选择是 MarshallingMessageConverter。
它委托给 Spring OXM 库的 Marshaller 和 Unmarshaller 策略接口实现。
您可以在 {spring-framework-docs}/data-access/oxm.html[此处] 阅读有关该库的更多信息。
在配置方面,最常见的是只提供构造函数参数,因为大多数 Marshaller 实现也实现了 Unmarshaller。
以下示例显示了如何配置 MarshallingMessageConverter:
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
<constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
</bean>
</property>
</bean>
Jackson2XmlMessageConverter
此类别在 2.1 版中引入,可用于在 XML 之间转换消息。
Jackson2XmlMessageConverter 和 Jackson2JsonMessageConverter 具有相同的基类:AbstractJackson2MessageConverter。
|
引入 |
Jackson2XmlMessageConverter 使用 com.fasterxml.jackson 2.x 库。
您可以像使用 Jackson2JsonMessageConverter 一样使用它,只是它支持 XML 而不是 JSON。
以下示例配置 Jackson2JsonMessageConverter:
<bean id="xmlConverterWithDefaultType"
class="org.springframework.amqp.support.converter.Jackson2XmlMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="foo.PurchaseOrder"/>
</bean>
</property>
</bean>
有关更多信息,请参阅 Jackson2JsonMessageConverter。
|
从版本 2.2 开始,如果不存在 |
ContentTypeDelegatingMessageConverter
此类别在 1.4.2 版中引入,允许根据 MessageProperties 中的内容类型属性委托给特定的 MessageConverter。
默认情况下,如果不存在 contentType 属性或存在与任何已配置转换器都不匹配的值,它会委托给 SimpleMessageConverter。
以下示例配置 ContentTypeDelegatingMessageConverter:
<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json" value-ref="jsonMessageConverter" />
<entry key="application/xml" value-ref="xmlMessageConverter" />
</map>
</property>
</bean>
Java 反序列化
本节介绍如何反序列化 Java 对象。
从不受信任的来源反序列化 Java 对象时可能存在漏洞。
如果您接受来自不受信任的来源且 content-type 为 application/x-java-serialized-object 的消息,则应
考虑配置允许反序列化哪些包和类。
这适用于 SimpleMessageConverter 和 SerializerMessageConverter,当它们隐式或通过配置使用
DefaultDeserializer 时。
默认情况下,允许列表为空,这意味着不会反序列化任何类。
您可以设置模式列表,例如 thing1.、thing1.thing2.Cat 或 .MySafeClass。
模式将按顺序检查,直到找到匹配项。
如果未找到匹配项,则抛出 SecurityException。
您可以使用这些转换器上的 allowedListPatterns 属性设置模式。
或者,如果您信任所有消息发起者,可以将环境变量 SPRING_AMQP_DESERIALIZATION_TRUST_ALL 或系统属性 spring.amqp.deserialization.trust.all 设置为 true。
消息属性转换器
MessagePropertiesConverter 策略接口用于在 Rabbit 客户端 BasicProperties 和 Spring AMQP MessageProperties 之间进行转换。
默认实现 (DefaultMessagePropertiesConverter) 通常足以满足大多数目的,但如果需要,您可以实现自己的。
默认属性转换器将 BasicProperties 类型为 LongString 的元素转换为 String 实例,当大小不超过 1024 字节时。
较大的 LongString 实例不进行转换(参见下一段)。
此限制可以用构造函数参数覆盖。
从 1.6 版本开始,长度超过长字符串限制(默认值:1024)的头部现在默认由
DefaultMessagePropertiesConverter 保留为 LongString 实例。
您可以通过 getBytes[]、toString() 或 getStream() 方法访问内容。
以前,DefaultMessagePropertiesConverter 将此类头部“转换”为 DataInputStream(实际上它只是引用了 LongString 实例的 DataInputStream)。
在输出时,此头部未转换(除了转换为字符串 — 例如,java.io.DataInputStream@1d057a39,通过调用流上的 toString())。
现在,大型传入的 LongString 头部在输出时也正确地“转换”(默认情况下)。
提供了一个新的构造函数,允许您配置转换器以像以前一样工作。 以下清单显示了方法的 Javadoc 注释和声明:
/**
* Construct an instance where LongStrings will be returned
* unconverted or as a java.io.DataInputStream when longer than this limit.
* Use this constructor with 'true' to restore pre-1.6 behavior.
* @param longStringLimit the limit.
* @param convertLongLongStrings LongString when false,
* DataInputStream when true.
* @since 1.6
*/
public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }
同样从 1.6 版本开始,correlationIdString 这一新属性已添加到 MessageProperties 中。
以前,当在 RabbitMQ 客户端使用的 BasicProperties 之间进行转换时,会执行不必要的 byte[] <→ String 转换,因为 MessageProperties.correlationId 是 byte[],但 BasicProperties 使用 String。
(最终,RabbitMQ 客户端使用 UTF-8 将 String 转换为字节,以放入协议消息中)。
为了提供最大的向后兼容性,correlationIdPolicy 这一新属性已添加到
DefaultMessagePropertiesConverter 中。
它接受 DefaultMessagePropertiesConverter.CorrelationIdPolicy 枚举参数。
默认情况下,它设置为 BYTES,这复制了以前的行为。
对于入站消息:
-
STRING:仅映射correlationIdString属性 -
BYTES:仅映射correlationId属性 -
BOTH:同时映射这两个属性
对于出站消息:
-
STRING:仅映射correlationIdString属性 -
BYTES:仅映射correlationId属性 -
BOTH:同时考虑这两个属性,其中String属性优先
同样从 1.6 版本开始,入站 deliveryMode 属性不再映射到 MessageProperties.deliveryMode。
它改为映射到 MessageProperties.receivedDeliveryMode。
此外,入站 userId 属性不再映射到 MessageProperties.userId。
它改为映射到 MessageProperties.receivedUserId。
这些更改是为了避免如果同一个 MessageProperties 对象用于出站消息时这些属性的意外传播。
从 2.2 版本开始,DefaultMessagePropertiesConverter 使用 getName() 而不是 toString() 转换任何值为 Class<?> 类型的自定义头部;这避免了消费应用程序必须从 toString() 表示中解析类名。
对于滚动升级,您可能需要更改消费者以理解这两种格式,直到所有生产者都升级。