元数据存储

许多外部系统、服务或资源不是事务性的(如 Twitter、RSS、文件系统等),并且无法将数据标记为已读。 此外,有时您可能需要在某些集成解决方案中实现企业集成模式 幂等接收器。 为了实现此目标并在与外部系统进行下一次交互之前存储端点的某些先前状态或处理下一条消息,Spring Integration 提供了元数据存储组件,作为 org.springframework.integration.metadata.MetadataStore 接口的实现,具有通用的键值契约。 元数据存储旨在存储各种类型的通用元数据(例如,已处理的最新提要条目的发布日期),以帮助提要适配器等组件处理重复项。 如果组件未直接提供 MetadataStore 的引用,则定位元数据存储的算法如下:首先,在应用程序上下文中查找 ID 为 metadataStore 的 bean。 如果找到,则使用它。 否则,创建一个新的 SimpleMetadataStore 实例,这是一个内存实现,仅在当前运行的应用程序上下文的生命周期内持久化元数据。 这意味着,在重新启动后,您可能会遇到重复的条目。 如果您需要在应用程序上下文重新启动之间持久化元数据,框架提供了以下持久化 MetadataStores

PropertiesPersistingMetadataStore 由属性文件和 PropertiesPersister 支持。 默认情况下,它仅在应用程序上下文正常关闭时持久化状态。 它实现了 Flushable,因此您可以通过调用 flush() 随意持久化状态。 以下示例展示了如何使用 XML 配置“PropertiesPersistingMetadataStore”:

<bean id="metadataStore"
    class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>

或者,您可以提供自己的 MetadataStore 接口实现(例如,JdbcMetadataStore),并将其配置为应用程序上下文中的 bean。 从 4.0 版本开始,SimpleMetadataStorePropertiesPersistingMetadataStoreRedisMetadataStore 实现了 ConcurrentMetadataStore。 这些提供原子更新,并且可以在多个组件或应用程序实例中使用。

幂等接收器和元数据存储

当需要过滤已处理的传入消息并可以丢弃它或在丢弃时执行其他逻辑时,元数据存储对于实现 EIP 幂等接收器 模式非常有用。 以下配置显示了如何执行此操作的示例:

<int:filter input-channel="serviceChannel"
			output-channel="idempotentServiceChannel"
			discard-channel="discardChannel"
			expression="@metadataStore.get(headers.businessKey) == null"/>

<int:publish-subscribe-channel id="idempotentServiceChannel"/>

<int:outbound-channel-adapter channel="idempotentServiceChannel"
                              expression="@metadataStore.put(headers.businessKey, '')"/>

<int:service-activator input-channel="idempotentServiceChannel" ref="service"/>

幂等条目的 value 可以是过期日期,在此日期之后,该条目应由某个计划的回收器从元数据存储中删除。

MetadataStoreListener

某些元数据存储(目前只有 Zookeeper)支持注册监听器以在项目更改时接收事件,如以下示例所示:

public interface MetadataStoreListener {

	void onAdd(String key, String value);

	void onRemove(String key, String oldValue);

	void onUpdate(String key, String newValue);
}

有关更多信息,请参阅 Javadoc。 如果您只对部分事件感兴趣,可以子类化 MetadataStoreListenerAdapter