Hazelcast 支持

Spring Integration 提供了通道适配器和其他实用组件,用于与内存数据网格 Hazelcast 交互。 你需要将此依赖项添加到你的项目中:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-hazelcast</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-hazelcast:{project-version}"

Hazelcast 组件的 XML 命名空间和 schemaLocation 定义如下:

xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
          https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"

Hazelcast 事件驱动入站通道适配器

Hazelcast 提供了分布式数据结构,例如:

  • com.hazelcast.map.IMap

  • com.hazelcast.multimap.MultiMap

  • com.hazelcast.collection.IList

  • com.hazelcast.collection.ISet

  • com.hazelcast.collection.IQueue

  • com.hazelcast.topic.ITopic

  • com.hazelcast.replicatedmap.ReplicatedMap

它还提供了事件监听器,用于监听对这些数据结构的修改。

  • com.hazelcast.core.EntryListener<K, V>

  • com.hazelcast.collection.ItemListener

  • com.hazelcast.topic.MessageListener

Hazelcast 事件驱动入站通道适配器监听相关的缓存事件,并将事件消息发送到定义的通道。 它支持 XML 和 JavaConfig 驱动的配置。

XML 配置:

<int-hazelcast:inbound-channel-adapter channel="mapChannel"
                      cache="map"
                      cache-events="UPDATED, REMOVED"
                      cache-listening-policy="SINGLE" />

Hazelcast 事件驱动入站通道适配器需要以下属性:

  • channel:指定消息发送到的通道;

  • cache:指定被监听的分布式对象引用。 这是一个强制属性;

  • cache-events:指定要监听的缓存事件。 这是一个可选属性,其默认值为 ADDED。 其支持的值如下:

  • IMapMultiMap 支持的缓存事件类型:ADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL

  • ReplicatedMap 支持的缓存事件类型:ADDEDREMOVEDUPDATEDEVICTED

  • IListISetIQueue 支持的缓存事件类型:ADDEDREMOVEDITopic 没有缓存事件类型。

  • cache-listening-policy:指定缓存监听策略为 SINGLEALL。 这是一个可选属性,其默认值为 SINGLE。 每个监听相同缓存对象并具有相同 cache-events 属性的 Hazelcast 入站通道适配器,可以接收单个事件消息或所有事件消息。 如果设置为 ALL,则所有监听相同缓存对象并具有相同 cache-events 属性的 Hazelcast 入站通道适配器将接收所有事件消息。 如果设置为 SINGLE,它们将接收唯一的事件消息。

一些配置示例:

分布式 Map
<int:channel id="mapChannel"/>

<int-hazelcast:inbound-channel-adapter channel="mapChannel"
                              cache="map"
                              cache-events="UPDATED, REMOVED" />

<bean id="map" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="map"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>
分布式 MultiMap
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
                              cache="multiMap"
                              cache-events="ADDED, REMOVED, CLEAR_ALL" />

<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
    <constructor-arg value="multiMap"/>
</bean>
分布式 List
<int-hazelcast:inbound-channel-adapter  channel="listChannel"
                               cache="list"
                               cache-events="ADDED, REMOVED"
                               cache-listening-policy="ALL" />

<bean id="list" factory-bean="instance" factory-method="getList">
    <constructor-arg value="list"/>
</bean>
分布式 Set
<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />

<bean id="set" factory-bean="instance" factory-method="getSet">
    <constructor-arg value="set"/>
</bean>
分布式 Queue
<int-hazelcast:inbound-channel-adapter  channel="queueChannel"
                               cache="queue"
                               cache-events="REMOVED"
                               cache-listening-policy="ALL" />

<bean id="queue" factory-bean="instance" factory-method="getQueue">
    <constructor-arg value="queue"/>
</bean>
分布式 Topic
<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />

<bean id="topic" factory-bean="instance" factory-method="getTopic">
    <constructor-arg value="topic"/>
</bean>
复制 Map
<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
                              cache="replicatedMap"
                              cache-events="ADDED, UPDATED, REMOVED"
                              cache-listening-policy="SINGLE"  />

<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
    <constructor-arg value="replicatedMap"/>
</bean>

Java 配置示例:

以下示例展示了 DistributedMap 配置。 相同的配置可用于其他分布式数据结构(IMapMultiMapReplicatedMapIListISetIQueueITopic):

@Bean
public PollableChannel distributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> distributedMap() {
    return hazelcastInstance().getMap("Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
    final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
    producer.setOutputChannel(distributedMapChannel());
    producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
    producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);

    return producer;
}

Hazelcast 连续查询入站通道适配器

Hazelcast 连续查询允许监听对特定 Map 条目执行的修改。 Hazelcast 连续查询入站通道适配器是一个事件驱动的通道适配器,它根据定义的谓词监听相关的分布式 Map 事件。

  • Java

  • XML

@Bean
public PollableChannel cqDistributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> cqDistributedMap() {
    return hazelcastInstance().getMap("CQ_Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
    final HazelcastContinuousQueryMessageProducer producer =
        new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
    producer.setOutputChannel(cqDistributedMapChannel());
    producer.setCacheEventTypes("UPDATED");
    producer.setIncludeValue(false);

    return producer;
}
<int:channel id="cqMapChannel"/>

<int-hazelcast:cq-inbound-channel-adapter
                channel="cqMapChannel"
                cache="cqMap"
                cache-events="UPDATED, REMOVED"
                predicate="name=TestName AND surname=TestSurname"
                include-value="true"
                cache-listening-policy="SINGLE"/>

<bean id="cqMap" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="cqMap"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

它支持以下六个属性:

  • channel:指定消息发送到的通道;

  • cache:指定被监听的分布式 Map 引用。 强制;

  • cache-events:指定要监听的缓存事件。 可选属性,默认值为 ADDED。 支持的值为 ADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL

  • predicate:指定一个谓词,用于监听对特定 Map 条目执行的修改。 强制;

  • include-value:指定连续查询结果中是否包含值和旧值。 可选属性,默认值为 true

  • cache-listening-policy:指定缓存监听策略为 SINGLEALL。 可选属性,默认值为 SINGLE。 每个监听相同缓存对象并具有相同 cache-events 属性的 Hazelcast CQ 入站通道适配器,可以接收单个事件消息或所有事件消息。 如果设置为 ALL,则所有监听相同缓存对象并具有相同 cache-events 属性的 Hazelcast CQ 入站通道适配器将接收所有事件消息。 如果设置为 SINGLE,它们将接收唯一的事件消息。

Hazelcast 集群监视器入站通道适配器

Hazelcast 集群监视器支持监听对集群执行的修改。 Hazelcast 集群监视器入站通道适配器是一个事件驱动的通道适配器,它监听相关的成员资格、分布式对象、迁移、生命周期和客户端事件:

  • Java

  • XML

@Bean
public PollableChannel eventChannel() {
    return new QueueChannel();
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
    HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
    producer.setOutputChannel(eventChannel());
    producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");

    return producer;
}
<int:channel id="monitorChannel"/>

<int-hazelcast:cm-inbound-channel-adapter
                 channel="monitorChannel"
                 hazelcast-instance="instance"
                 monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT" />

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

它支持以下三个属性:

  • channel:指定消息发送到的通道;

  • hazelcast-instance:指定要监听集群事件的 Hazelcast 实例引用。 这是一个强制属性;

  • monitor-types:指定要监听的监视器类型。 这是一个可选属性,默认值为 MEMBERSHIP。 支持的值为 MEMBERSHIPDISTRIBUTED_OBJECTMIGRATIONLIFECYCLECLIENT

Hazelcast 分布式 SQL 入站通道适配器

Hazelcast 允许在分布式 Map 上运行分布式查询。 Hazelcast 分布式 SQL 入站通道适配器是一个轮询入站通道适配器。 它运行定义的分布式 SQL 命令并根据迭代类型返回结果。

  • Java

  • XML

@Bean
public PollableChannel dsDistributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> dsDistributedMap() {
    return hazelcastInstance().getMap("DS_Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
    final HazelcastDistributedSQLMessageSource messageSource =
        new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
            "name='TestName' AND surname='TestSurname'");
    messageSource.setIterationType(DistributedSQLIterationType.ENTRY);

    return messageSource;
}
<int:channel id="dsMapChannel"/>

<int-hazelcast:ds-inbound-channel-adapter
            channel="dsMapChannel"
            cache="dsMap"
            iteration-type="ENTRY"
            distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
    <int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>

<bean id="dsMap" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="dsMap"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

它需要一个轮询器并支持四个属性:

  • channel:指定消息发送到的通道。 这是一个强制属性;

  • cache:指定被查询的分布式 IMap 引用。 这是一个强制属性;

  • iteration-type:指定结果类型。 分布式 SQL 可以在 EntrySetKeySetLocalKeySetValues 上运行。 这是一个可选属性,默认值为 VALUE。 支持的值为 ENTRYKEYLOCAL_KEYVALUE

  • distributed-sql:指定 SQL 语句的 where 子句。 这是一个强制属性。

Hazelcast 出站通道适配器

Hazelcast 出站通道适配器监听其定义的通道并将传入消息写入相关的分布式缓存。 它需要 cachecache-expressionHazelcastHeaders.CACHE_NAME 之一用于分布式对象定义。 支持的分布式对象有:IMapMultiMapReplicatedMapIListISetIQueueITopic

  • Java

  • XML

@Bean
public MessageChannel distributedMapChannel() {
    return new DirectChannel();
}

@Bean
public IMap<Integer, String> distributedMap() {
    return hzInstance().getMap("Distributed_Map");
}

@Bean
public HazelcastInstance hzInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
    HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
    handler.setDistributedObject(distributedMap());
    handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
    handler.setExtractPayload(true);
    return handler;
}
<int-hazelcast:outbound-channel-adapter channel="mapChannel"
                    cache-expression="headers['CACHE_HEADER']"
                    key-expression="payload.key"
                    extract-payload="true"/>

它需要以下属性:

  • channel:指定消息发送到的通道;

  • cache:指定分布式对象引用。 可选;

  • cache-expression:通过 Spring 表达式语言 (SpEL) 指定分布式对象。 可选;

  • key-expression:通过 Spring 表达式语言 (SpEL) 指定键值对的键。 可选,仅 IMapMultiMapReplicatedMap 分布式数据结构需要。

  • extract-payload:指定是发送整个消息还是只发送有效负载。 可选属性,默认值为 true。 如果为 true,则只将有效负载写入分布式对象。 否则,将通过转换消息头和有效负载来写入整个消息。

通过在消息头中设置分布式对象名称,可以通过同一通道将消息写入不同的分布式对象。 如果未定义 cachecache-expression 属性,则必须在请求 Message 中设置 HazelcastHeaders.CACHE_NAME 消息头。

Hazelcast 领导者选举

如果需要领导者选举(例如,对于高可用性消息消费者,其中只有一个节点应该接收消息),可以使用基于 Hazelcast 的 LeaderInitiator

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public LeaderInitiator initiator() {
    return new LeaderInitiator(hazelcastInstance());
}

当一个节点被选为领导者时,它将向所有应用程序监听器发送一个 OnGrantedEvent

LeaderInitiator 自版本 6.5 起已弃用,因为它依赖于 CP Subsystem,而 CP Subsystem 已于 Hazelcast 5.5 之后从开源中移除。

Hazelcast 消息存储

对于分布式消息状态管理,例如持久化 QueueChannel 或跟踪 Aggregator 消息组,提供了 HazelcastMessageStore 实现:

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public MessageGroupStore messageStore() {
    return new HazelcastMessageStore(hazelcastInstance());
}

默认情况下,SPRING_INTEGRATION_MESSAGE_STORE IMap 用于将消息和组存储为键/值。 任何自定义的 IMap 都可以提供给 HazelcastMessageStore

Hazelcast 元数据存储

使用 Hazelcast IMap 作为支持的 ListenableMetadataStore 实现可用。 默认 Map 以名称 SPRING_INTEGRATION_METADATA_STORE 创建,该名称可以自定义。

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public MetadataStore metadataStore() {
    return new HazelcastMetadataStore(hazelcastInstance());
}

HazelcastMetadataStore 实现了 ListenableMetadataStore,它允许你注册自己的 MetadataStoreListener 类型的监听器,通过 addListener(MetadataStoreListener callback) 监听事件。

Hazelcast 锁注册表

可以使用 Hazelcast 分布式 ILock 支持来实现 LockRegistry

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public LockRegistry lockRegistry() {
    return new HazelcastLockRegistry(hazelcastInstance());
}

HazelcastLockRegistry 自版本 6.5 起已弃用,因为它依赖于 CP Subsystem,而 CP Subsystem 已于 Hazelcast 5.5 之后从开源中移除。

当与共享的 MessageGroupStore(例如 Aggregator 存储管理)一起使用时,HazelcastLockRegistry 可用于在多个应用程序实例中提供此功能,以便一次只有一个实例可以操作该组。

对于所有分布式操作,必须在 HazelcastInstance 上启用 CP 子系统。

使用 Hazelcast 的消息通道

Hazelcast IQueueITopic 分布式对象本质上是消息传递原语,可以与 Spring Integration 核心组件一起使用,而无需在此 Hazelcast 模块中进行额外的实现。

QueueChannel 可以由任何 java.util.Queue 提供,包括前面提到的 Hazelcast 分布式 IQueue

@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
    return new QueueChannel(hazelcastInstance.getQueue("springIntegrationQueue"));
}

将此配置放置在应用程序的 Hazelcast 集群中的多个节点上,将使 QueueChannel 成为分布式通道,并且只有一个节点能够从该 IQueue 轮询单个 Message。 这类似于 PollableJmsChannelPollableKafkaChannelPollableAmqpChannel

如果生产者端不是 Spring Integration 应用程序,则无法配置 QueueChannel,因此使用普通的 Hazelcast IQueue API 来生成数据。 在这种情况下,消费者端的 QueueChannel 方法是错误的:必须使用 入站通道适配器 解决方案:

@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
    return hazelcastInstance.getQueue("springIntegrationQueue");
}

@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
    return myStringHzQueue::poll;
}

Hazelcast 中的 ITopic 抽象具有与 JMS 中的 Topic 类似的语义:所有订阅者都接收已发布的消息。 通过一对简单的 MessageChannel bean,此机制作为开箱即用的功能得到支持:

@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
        MessageChannel fromHazelcastTopicChannel) {

    ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
	topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
	return topic;
}

@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
    return new FixedSubscriberChannel(springIntegrationTopic::publish);
}

@Bean
public MessageChannel fromHazelcastTopicChannel() {
    return new DirectChannel();
}

FixedSubscriberChannelDirectChannel 的优化变体,它在初始化时需要一个 MessageHandler。 由于 MessageHandler 是一个函数式接口,因此可以为 handleMessage 方法提供一个简单的 lambda 表达式。 当消息发送到 publishToHazelcastTopicChannel 时,它只是发布到 Hazelcast ITopiccom.hazelcast.topic.MessageListener 也是一个函数式接口,因此可以为 ITopic#addMessageListener 提供一个 lambda 表达式。 因此,fromHazelcastTopicChannel 的订阅者将消费所有发送到上述 ITopic 的消息。

ExecutorChannel 可以与 IExecutorService 一起提供。 例如,通过相应的配置,可以实现集群范围内的单例:

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(
                new Config()
                    .addExecutorConfig(new ExecutorConfig()
                         .setName("singletonExecutor")
                         .setPoolSize(1)));
}

@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
    return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}