消息

Spring Integration 的 Message 是一个通用的数据容器。 任何对象都可以作为有效载荷,并且每个 Message 实例都包含以键值对形式存储的用户可扩展属性的头部。

Message 接口

以下清单显示了 Message 接口的定义:

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

Message 接口是 API 的核心部分。 通过将数据封装在通用包装器中,消息系统可以在不知道数据类型的情况下传递它。 随着应用程序演进以支持新类型,或者当类型本身被修改或扩展时,消息系统不会受到影响。 另一方面,当消息系统中的某个组件确实需要访问有关 Message 的信息时,此类元数据通常可以存储到消息头部中并从中检索。

消息头部

正如 Spring Integration 允许任何 Object 作为 Message 的有效载荷一样,它也支持任何 Object 类型作为头部值。 实际上,MessageHeaders 类实现了 java.util.Map_ 接口,如下面的类定义所示:

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}

尽管 MessageHeaders 类实现了 Map,但它实际上是一个只读实现。 任何尝试在 Map 中 put 值的操作都会导致 UnsupportedOperationExceptionremoveclear 也是如此。 由于消息可能会传递给多个消费者,因此 Map 的结构无法修改。 同样,消息的有效载荷 Object 在初始创建后不能被 set。 然而,头部值本身(或有效载荷 Object)的可变性有意留给框架用户决定。

作为 Map 的实现,可以通过调用 get(..) 并传入头部名称来检索头部。 或者,您可以将预期的 Class 作为附加参数提供。 更好的是,当检索预定义值之一时,可以使用方便的 getter。 以下示例显示了这三种选项:

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

下表描述了预定义的消息头部:

Table 1. 预定义消息头部
头部名称 头部类型 用途
    MessageHeaders.ID
    java.util.UUID

此消息实例的标识符。 每次消息被修改时都会更改。

    MessageHeaders.
TIMESTAMP
    java.lang.Long

消息创建的时间。 每次消息被修改时都会更改。

    MessageHeaders.
REPLY_CHANNEL
    java.lang.Object
(String or
MessageChannel)

当没有配置显式输出通道且没有 ROUTING_SLIPROUTING_SLIP 已耗尽时,回复(如果有)将发送到的通道。 如果值为 String,则它必须表示一个 bean 名称或由 ChannelRegistry 生成。

    MessageHeaders.
ERROR_CHANNEL
    java.lang.Object
(String or
MessageChannel)

错误将发送到的通道。 如果值为 String,则它必须表示一个 bean 名称或由 ChannelRegistry 生成。

许多入站和出站适配器实现也提供或期望某些头部,并且您可以配置其他用户定义的头部。 这些头部的常量可以在存在此类头部的模块中找到,例如 AmqpHeadersJmsHeaders 等。

MessageHeaderAccessor API

从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息抽象已移至 spring-messaging 模块,并且引入了 MessageHeaderAccessor API 以提供消息实现之上的额外抽象。 所有(核心)Spring Integration 特定的消息头部常量现在都在 IntegrationMessageHeaderAccessor 类中声明。 下表描述了预定义的消息头部:

Table 2. 预定义消息头部
头部名称 头部类型 用途
    IntegrationMessageHeaderAccessor.
CORRELATION_ID
    java.lang.Object

用于关联两条或多条消息。

    IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
    java.lang.Integer

通常是具有 SEQUENCE_SIZE 的一组消息中的序列号,但也可以在 <resequencer/> 中用于重新排序无界限的消息组。

    IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
    java.lang.Integer

一组关联消息中的消息数量。

    IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
    java.lang.Long

指示消息何时过期。 框架不直接使用,但可以通过头部丰富器设置并在配置了 UnexpiredMessageSelector<filter/> 中使用。

    IntegrationMessageHeaderAccessor.
PRIORITY
    java.lang.Integer

消息优先级——例如,在 PriorityChannel 中。

    IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
    java.lang.Boolean

如果消息被幂等接收器拦截器检测为重复消息,则为 True。 请参阅 幂等接收器企业集成模式

    IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
    java.io.Closeable

如果消息与在消息处理完成后应关闭的 Closeable 相关联,则存在此头部。 一个示例是使用 FTP、SFTP 等进行流式文件传输时关联的 Session

    IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
    java.lang.
AtomicInteger

如果消息驱动的通道适配器支持 RetryTemplate 的配置,则此头部包含当前的传递尝试次数。

    IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
    o.s.i.support.
Acknowledgment
Callback

如果入站端点支持,则回调以接受、拒绝或重新排队消息。 请参阅 延迟确认可轮询消息源MQTT 手动确认

IntegrationMessageHeaderAccessor 类提供了其中一些头部的便捷类型化 getter,如下例所示:

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

下表描述了也出现在 IntegrationMessageHeaderAccessor 中但通常不被用户代码使用的头部(也就是说,它们通常由 Spring Integration 的内部部分使用——此处包含它们是为了完整性):

Table 3. 预定义消息头部
头部名称 头部类型 用途
    IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
    java.util.
List&amp;lt;List&amp;lt;Object&amp;gt;&amp;gt;

当需要嵌套关联时使用的关联数据堆栈(例如, splitter→…​→splitter→…​→aggregator→…​→aggregator)。

    IntegrationMessageHeaderAccessor.
ROUTING_SLIP
    java.util.
Map&amp;lt;List&amp;lt;Object&amp;gt;, Integer&amp;gt;

请参阅 路由单

消息 ID 生成

当消息在应用程序中转换时,每次它被修改(例如, 通过转换器)都会分配一个新的消息 ID。 消息 ID 是一个 UUID。 从 Spring Integration 3.0 开始,用于 IS 生成的默认策略比以前的 java.util.UUID.randomUUID() 实现更高效。 它使用基于安全随机种子的简单随机数,而不是每次都创建安全随机数。

可以通过在应用程序上下文中声明一个实现 org.springframework.util.IdGenerator 的 bean 来选择不同的 UUID 生成策略。

在一个类加载器中只能使用一种 UUID 生成策略。 这意味着,如果两个或更多应用程序上下文在同一个类加载器中运行,它们将共享相同的策略。 如果其中一个上下文更改了策略,则所有上下文都将使用它。 如果同一个类加载器中的两个或更多上下文声明了一个 org.springframework.util.IdGenerator 类型的 bean,它们都必须是同一个类的实例。 否则,尝试替换自定义策略的上下文将无法初始化。 如果策略相同但已参数化,则使用第一个初始化上下文中的策略。

除了默认策略之外,还提供了两个额外的 IdGeneratorsorg.springframework.util.JdkIdGenerator 使用以前的 UUID.randomUUID() 机制。 当实际上不需要 UUID 并且简单的递增值就足够时,可以使用 o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator

只读头部

MessageHeaders.IDMessageHeaders.TIMESTAMP 是只读头部,无法覆盖。

从版本 4.3.2 开始,MessageBuilder 提供了 readOnlyHeaders(String…​ readOnlyHeaders) API,用于自定义不应从上游 Message 复制的头部列表。 默认情况下,只有 MessageHeaders.IDMessageHeaders.TIMESTAMP 是只读的。 全局 spring.integration.readOnly.headers 属性(请参阅 全局属性)用于为框架组件自定义 DefaultMessageBuilderFactory。 当您不想由 ObjectToJsonTransformer 填充某些开箱即用的头部(例如 contentType)时,这会很有用(请参阅 JSON 转换器)。

当您尝试使用 MessageBuilder 构建新消息时,此类头部将被忽略,并且会向日志发出特定的 INFO 消息。

从版本 5.0 开始,消息网关头部丰富器内容丰富器头部过滤器 在使用 DefaultMessageBuilderFactory 时不允许您配置 MessageHeaders.IDMessageHeaders.TIMESTAMP 头部名称,并且它们会抛出 BeanInitializationException

头部传播

当消息通过消息生成端点(例如 服务激活器)处理(和修改)时,通常,入站头部会传播到出站消息。 一个例外是 转换器,当完整的消息返回到框架时。 在这种情况下,用户代码负责整个出站消息。 当转换器只返回有效载荷时,入站头部会传播。 此外,头部仅在出站消息中不存在时才传播,允许您根据需要更改头部值。

从版本 4.3.10 开始,您可以配置消息处理器(修改消息并产生输出)以抑制特定头部的传播。 要配置您不想复制的头部,请在 MessageProducingMessageHandler 抽象类上调用 setNotPropagatedHeaders()addNotPropagatedHeaders() 方法。

您还可以通过将 META-INF/spring.integration.properties 中的 readOnlyHeaders 属性设置为以逗号分隔的头部列表来全局抑制特定消息头部的传播。

从版本 5.0 开始,AbstractMessageProducingHandler 上的 setNotPropagatedHeaders() 实现应用简单模式(xxx*xxx*xxxxxx*yyy)以允许过滤具有共同后缀或前缀的头部。 有关更多信息,请参阅 PatternMatchUtils Javadoc。 当其中一种模式是 *(星号)时,不传播任何头部。 所有其他模式都将被忽略。 在这种情况下,服务激活器的行为与转换器相同,并且任何所需的头部都必须在服务方法返回的 Message 中提供。 Java DSL 的 ConsumerEndpointSpec 中提供了 notPropagatedHeaders() 选项。 它也适用于 <service-activator> 组件的 XML 配置,作为 not-propagated-headers 属性。

头部传播抑制不适用于不修改消息的端点,例如 桥接器路由器

消息实现

Message 接口的基本实现是 GenericMessage<T>,它提供了两个构造函数,如下面的清单所示:

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

当创建 Message 时,会生成一个随机的唯一 ID。 接受 Map 头部信息的构造函数会将提供的头部复制到新创建的 Message 中。

还有一个方便的 Message 实现,旨在传达错误条件。 此实现将其有效载荷作为 Throwable 对象,如下例所示:

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

请注意,此实现利用了 GenericMessage 基类是参数化的事实。 因此,如两个示例所示,检索 Message 有效载荷 Object 时不需要进行任何强制转换。

上述 Message 类实现是不可变的。 在某些情况下,当可变性不是问题并且应用程序逻辑设计良好以避免并发修改时,可以使用 MutableMessage

MessageBuilder 辅助类

您可能会注意到 Message 接口定义了其有效载荷和头部的检索方法,但未提供任何 setter。 这是因为 Message 在初始创建后无法修改。 因此,当 Message 实例发送到多个消费者(例如,通过发布-订阅通道)时,如果其中一个消费者需要发送带有不同有效载荷类型的回复,它必须创建一个新的 Message。 因此,其他消费者不会受到这些更改的影响。 请记住,多个消费者可以访问相同的有效载荷实例或头部值,并且此类实例本身是否不可变由您决定。 换句话说,Message 实例的契约类似于不可修改的 CollectionMessageHeaders 映射进一步证明了这一点。 尽管 MessageHeaders 类实现了 java.util.Map,但任何尝试对 MessageHeaders 实例调用 put 操作(或“remove”或“clear”)都会导致 UnsupportedOperationException

Spring Integration 不要求创建和填充 Map 以传递到 GenericMessage 构造函数中,而是提供了一种更方便的方式来构造消息:MessageBuilderMessageBuilder 提供了两种工厂方法,用于从现有 Message 或带有有效载荷 Object 创建 Message 实例。 从现有 Message 构建时,该 Message 的头部和有效载荷将被复制到新的 Message 中,如下例所示:

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果您需要创建一个带有新有效载荷但仍想从现有 Message 复制头部的 Message,您可以使用其中一个“copy”方法,如下例所示:

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

请注意,copyHeadersIfAbsent 方法不会覆盖现有值。 此外,在前面的示例中,您可以看到如何使用 setHeader 设置任何用户定义的头部。 最后,预定义头部以及设置任何头部(MessageHeaders 也定义了预定义头部名称的常量)的非破坏性方法都提供了 set 方法。

您还可以使用 MessageBuilder 设置消息的优先级,如下例所示:

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

priority 头部仅在使用 PriorityChannel 时才会被考虑(如下一章所述)。 它被定义为 java.lang.Integer

MutableMessageBuilder 用于处理 MutableMessage 实例。 此类的逻辑是创建 MutableMessage 或保持原样并通过构建器方法修改其内容。 这样,当消息交换不涉及不变性时,运行应用程序的性能会略有提高。

从版本 6.4 开始,BaseMessageBuilder 类从 MessageBuilder 中提取出来,以简化默认消息构建逻辑的扩展。 例如,与自定义 MessageBuilderFactory 一起,自定义 BaseMessageBuilder 实现可以在应用程序上下文中全局使用,以提供自定义 Message 实例。 特别是,可以覆盖 GenericMessage.toString() 方法,以便在记录此类消息时隐藏有效载荷和头部中的敏感信息。

MessageBuilderFactory 抽象

MessageBuilderFactory bean,带有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME,全局注册到应用程序上下文中,并在框架中到处用于创建 Message 实例。 默认情况下,它是 DefaultMessageBuilderFactory 的实例。 开箱即用,框架还提供了一个 MutableMessageBuilderFactory,用于在框架组件中创建 MutableMessage 实例。 要自定义 Message 实例的创建,必须在目标应用程序上下文中提供一个带有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAMEMessageBuilderFactory bean,以覆盖默认的。 例如,可以为 BaseMessageBuilder 的实现注册一个自定义 MessageBuilderFactory,我们希望在那里提供一个带有重写的 toString()GenericMessage 扩展,以便在记录此类消息时隐藏有效载荷和头部中的敏感信息。

这些类的一些快速实现来演示个人身份信息缓解措施可以是这样的:

class PiiMessageBuilderFactory implements MessageBuilderFactory {

	@Override
	public <T> PiiMessageBuilder<T> fromMessage(Message<T> message) {
	    return new PiiMessageBuilder<>(message.getPayload(), message);
	}

	@Override
	public <T> PiiMessageBuilder<T> withPayload(T payload) {
	    return new PiiMessageBuilder<>(payload, null);
	}

}

class PiiMessageBuilder<P> extends BaseMessageBuilder<P, PiiMessageBuilder<P>> {

    public PiiMessageBuilder(P payload, @Nullable Message<P> originalMessage) {
        super(payload, originalMessage);
    }

    @Override
    public Message<P> build() {
        return new PiiMessage<>(getPayload(), getHeaders());
    }

}

class PiiMessage<P> extends GenericMessage<P> {

    @Serial
    private static final long serialVersionUID = -354503673433669578L;

    public PiiMessage(P payload, Map<String, Object> headers) {
        super(payload, headers);
    }

    @Override
    public String toString() {
        return "PiiMessage [payload=" + getPayload() + ", headers=" + maskHeaders(getHeaders()) + ']';
    }

    private static Map<String, Object> maskHeaders(Map<String, Object> headers) {
        return headers.entrySet()
                .stream()
                .map((entry) -> entry.getKey().equals("password") ? Map.entry(entry.getKey(), "******") : entry)
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

}

然后,此 PiiMessageBuilderFactory 可以注册为一个 bean,并且每当框架记录消息时(例如,在 errorChannel 的情况下),password 头部将被屏蔽。