Hazelcast Support
Spring Integration 提供通道适配器和其他实用组件来与内存数据网格 Hazelcast 交互。
Spring Integration provides channel adapters and other utility components to interact with an in-memory data grid Hazelcast.
你需要将此依赖项包含在你的项目中:
You need to include this dependency into your project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-hazelcast</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-hazelcast:{project-version}"
Hazelcast 组件的 XML 命名空间和 schemaLocation 定义为:
The XML namespace and schemaLocation definitions for Hazelcast components are:
xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"
Hazelcast Event-driven Inbound Channel Adapter
Hazelcast 提供分布式数据结构,例如:
Hazelcast provides distributed data structures such as:
-
com.hazelcast.map.IMap
-
com.hazelcast.multimap.MultiMap
-
com.hazelcast.collection.IList
-
com.hazelcast.collection.ISet
-
com.hazelcast.collection.IQueue
-
com.hazelcast.topic.ITopic
-
com.hazelcast.replicatedmap.ReplicatedMap
它还提供事件侦听器,以便侦听对这些数据结构所做的修改。
It also provides event listeners in order to listen to modifications made to these data structures.
-
com.hazelcast.core.EntryListener<K, V>
-
com.hazelcast.collection.ItemListener
-
com.hazelcast.topic.MessageListener
Hazelcast 事件驱动的入站通道适配器侦听相关的缓存事件,并将事件消息发送到定义的通道。它同时支持 XML 配置和 JavaConfig 驱动的配置。
The Hazelcast Event-Driven Inbound Channel Adapter listens to related cache events and sends event messages to the defined channel. It supports both XML and JavaConfig driven configurations.
XML Configuration :
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED"
cache-listening-policy="SINGLE" />
Hazelcast 事件驱动的入站通道适配器需要以下属性:
The Hazelcast Event-Driven Inbound Channel Adapter requires the following attributes:
-
channel
:指定发送消息的通道; -
channel
: Specifies the channel to which messages are sent; -
cache
:指定要侦听的分布式对象引用。这是一个强制属性; -
cache
: Specifies the distributed Object reference which is listened to. It is a mandatory attribute; -
cache-events
:指定要侦听的缓存事件。这是一个可选属性,其默认值是ADDED
。其支持的值如下: -
cache-events
: Specifies cache events which are listened for. It is an optional attribute and its default value isADDED
. Its supported values are as follows : -
IMap
和MultiMap`所支持的缓存事件类型:`ADDED
,REMOVED
,UPDATED
,EVICTED
,EVICT_ALL
和CLEAR_ALL
; -
Supported cache event types for
IMap
andMultiMap
:ADDED
,REMOVED
,UPDATED
,EVICTED
,EVICT_ALL
andCLEAR_ALL
; -
ReplicatedMap
支持的缓存事件类型:ADDED
,REMOVED
,UPDATED
,EVICTED
; -
Supported cache event types for
ReplicatedMap
:ADDED
,REMOVED
,UPDATED
,EVICTED
; -
IList
,ISet
和IQueue
支持的缓存事件类型:ADDED
,REMOVED
。ITopic
没有缓存事件类型。 -
Supported cache event types for
IList
,ISet
andIQueue
:ADDED
,REMOVED
. There are no cache event types forITopic
. -
cache-listening-policy
:指定缓存侦听策略为SINGLE`或 `ALL
。这是一个可选属性,其默认值是SINGLE
。每个 Hazelcast 内部通道适配器监听具有相同 cache-events 属性的相同缓存对象,可以接收单个事件消息或所有事件消息。如果是ALL
,所有 Hazelcast 内部通道适配器监听具有相同 cache-events 属性的相同缓存对象,都将接收所有事件消息。如果是SINGLE
,它们将接收唯一事件消息。 -
cache-listening-policy
: Specifies the cache listening policy asSINGLE
orALL
. It is an optional attribute and its default value isSINGLE
. Each Hazelcast inbound channel adapter listening to the same cache object with the same cache-events attribute, can receive a single event message or all event messages. If it isALL
, all Hazelcast inbound channel adapters listening to the same cache object with the same cache-events attribute, will receive all event messages. If it isSINGLE
, they will receive unique event messages.
一些配置示例:
Some configuration samples:
<int:channel id="mapChannel"/>
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED" />
<bean id="map" factory-bean="instance" factory-method="getMap">
<constructor-arg value="map"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
cache="multiMap"
cache-events="ADDED, REMOVED, CLEAR_ALL" />
<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
<constructor-arg value="multiMap"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="listChannel"
cache="list"
cache-events="ADDED, REMOVED"
cache-listening-policy="ALL" />
<bean id="list" factory-bean="instance" factory-method="getList">
<constructor-arg value="list"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />
<bean id="set" factory-bean="instance" factory-method="getSet">
<constructor-arg value="set"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="queueChannel"
cache="queue"
cache-events="REMOVED"
cache-listening-policy="ALL" />
<bean id="queue" factory-bean="instance" factory-method="getQueue">
<constructor-arg value="queue"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />
<bean id="topic" factory-bean="instance" factory-method="getTopic">
<constructor-arg value="topic"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
cache="replicatedMap"
cache-events="ADDED, UPDATED, REMOVED"
cache-listening-policy="SINGLE" />
<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
<constructor-arg value="replicatedMap"/>
</bean>
Java Configuration Sample:
以下示例显示 DistributedMap
配置。相同配置可用于其他分布式数据结构(IMap
、MultiMap
、ReplicatedMap
、IList
、ISet
、IQueue
和 ITopic
):
The following sample shows a DistributedMap
configuration.
The same configuration can be used for other distributed data structures(IMap
, MultiMap
, ReplicatedMap
, IList
, ISet
, IQueue
and ITopic
):
@Bean
public PollableChannel distributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
return hazelcastInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
producer.setOutputChannel(distributedMapChannel());
producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
return producer;
}
Hazelcast Continuous Query Inbound Channel Adapter
Hazelcast 连续查询允许对特定映射条目所做的修改进行侦听。Hazelcast 连续查询入站通道适配器是事件驱动的通道适配器,它根据已定义的谓语侦听相关的分布式映射事件。
Hazelcast Continuous Query enables listening to modifications performed on specific map entries. The Hazelcast Continuous Query Inbound Channel Adapter is an event-driven channel adapter which listens to the related distributed map events in the light of the defined predicate.
-
Java
-
XML
@Bean
public PollableChannel cqDistributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> cqDistributedMap() {
return hazelcastInstance().getMap("CQ_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
final HazelcastContinuousQueryMessageProducer producer =
new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
producer.setOutputChannel(cqDistributedMapChannel());
producer.setCacheEventTypes("UPDATED");
producer.setIncludeValue(false);
return producer;
}
<int:channel id="cqMapChannel"/>
<int-hazelcast:cq-inbound-channel-adapter
channel="cqMapChannel"
cache="cqMap"
cache-events="UPDATED, REMOVED"
predicate="name=TestName AND surname=TestSurname"
include-value="true"
cache-listening-policy="SINGLE"/>
<bean id="cqMap" factory-bean="instance" factory-method="getMap">
<constructor-arg value="cqMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
它支持六个属性,如下所示:
It supports six attributes as follows:
-
channel
:指定发送消息的通道; -
channel
: Specifies the channel to which messages are sent; -
cache
:指定要侦听的分布式映射参考。强制的; -
cache
: Specifies the distributed Map reference which is listened to. Mandatory; -
cache-events
:指定要侦听的缓存事件。可选属性,ADDED
是其默认值。支持的值为:ADDED
,REMOVED
,UPDATED
,EVICTED
,EVICT_ALL
和CLEAR_ALL
; -
cache-events
: Specifies cache events which are listened for. Optional attribute withADDED
being its default value. Supported values areADDED
,REMOVED
,UPDATED
,EVICTED
,EVICT_ALL
andCLEAR_ALL
; -
predicate
:指定一个谓词来侦听对特定映射项所做的修改。强制的; -
predicate
: Specifies a predicate to listen to the modifications performed on specific map entries. Mandatory; -
include-value
:指定在连续查询结果中包含值和旧值。可选,true
是其默认值; -
include-value
: Specifies including the value and oldValue in a continuous query result. Optional withtrue
being the default; -
cache-listening-policy
:指定缓存监听策略为SINGLE`或 `ALL
。可选项,默认值为SINGLE
。每个侦听具有相同 cache-events 属性的相同缓存对象的 Hazelcast CQ 入站通道适配器,可以接收单个事件消息或所有事件消息。如果为ALL
,则所有侦听具有相同 cache-events 属性的相同缓存对象的 Hazelcast CQ 入站通道适配器将接收所有事件消息。如果为SINGLE
,则它们将接收唯一的事件消息。 -
cache-listening-policy
: Specifies the cache listening policy asSINGLE
orALL
. Optional with the default value beingSINGLE
. Each Hazelcast CQ inbound channel adapter listening to the same cache object with the same cache-events attribute, can receive a single event message or all event messages. If it isALL
, all Hazelcast CQ inbound channel adapters listening to the same cache object with the same cache-events attribute, will receive all event messages. If it isSINGLE
, they will receive unique event messages.
Hazelcast Cluster Monitor Inbound Channel Adapter
Hazelcast 集群监视器支持侦听对集群所做的修改。Hazelcast 集群监视器入站通道适配器是事件驱动的通道适配器,它侦听相关的成员资格、分布式对象、迁移、生命周期和客户端事件:
A Hazelcast Cluster Monitor supports listening to modifications performed on the cluster. The Hazelcast Cluster Monitor Inbound Channel Adapter is an event-driven channel adapter and listens to related Membership, Distributed Object, Migration, Lifecycle and Client events:
-
Java
-
XML
@Bean
public PollableChannel eventChannel() {
return new QueueChannel();
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
producer.setOutputChannel(eventChannel());
producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");
return producer;
}
<int:channel id="monitorChannel"/>
<int-hazelcast:cm-inbound-channel-adapter
channel="monitorChannel"
hazelcast-instance="instance"
monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT" />
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
它支持三个属性,如下所示:
It supports three attributes as follows :
-
channel
:指定发送消息的通道; -
channel
: Specifies the channel to which messages are sent; -
hazelcast-instance
:指定侦听集群事件的 Hazelcast 实例引用。这是必需属性; -
hazelcast-instance
: Specifies the Hazelcast Instance reference to listen for cluster events. It is a mandatory attribute; -
monitor-types
:指定侦听的监视器类型。这是可选属性,默认值为MEMBERSHIP
。支持的值为MEMBERSHIP
,DISTRIBUTED_OBJECT
,MIGRATION
,LIFECYCLE
,CLIENT
。 -
monitor-types
: Specifies the monitor types which are listened for. It is an optional attribute withMEMBERSHIP
being the default value. Supported values areMEMBERSHIP
,DISTRIBUTED_OBJECT
,MIGRATION
,LIFECYCLE
,CLIENT
.
Hazelcast Distributed SQL Inbound Channel Adapter
Hazelcast 允许在分布式映射上运行分布式查询。Hazelcast 分布式 SQL 入站通道适配器是一种轮询入站通道适配器。它运行已定义的分布式 SQL 命令,并根据迭代类型返回结果。
Hazelcast allows running distributed queries on the distributed map. The Hazelcast Distributed SQL Inbound Channel Adapter is a polling inbound channel adapter. It runs the defined distributed-sql command and returns results depending on the iteration type.
-
Java
-
XML
@Bean
public PollableChannel dsDistributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> dsDistributedMap() {
return hazelcastInstance().getMap("DS_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
final HazelcastDistributedSQLMessageSource messageSource =
new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
"name='TestName' AND surname='TestSurname'");
messageSource.setIterationType(DistributedSQLIterationType.ENTRY);
return messageSource;
}
<int:channel id="dsMapChannel"/>
<int-hazelcast:ds-inbound-channel-adapter
channel="dsMapChannel"
cache="dsMap"
iteration-type="ENTRY"
distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
<int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>
<bean id="dsMap" factory-bean="instance" factory-method="getMap">
<constructor-arg value="dsMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
它需要一个轮询器,并支持四个属性:
It requires a poller and supports four attributes:
-
channel
:指定发送消息到的通道。这是必需属性; -
channel
: Specifies the channel to which messages are sent. It is a mandatory attribute; -
cache
:指定已查询的分布式 `IMap`引用。这是必需属性; -
cache
: Specifies the distributedIMap
reference which is queried. It is mandatory attribute; -
iteration-type
:指定结果类型。分布式 SQL 可运行在EntrySet
,KeySet
,LocalKeySet`或 `Values`中。这是可选属性,默认值为 `VALUE
。支持的值为ENTRY, `KEY
,LOCAL_KEY`和 `VALUE
; -
iteration-type
: Specifies result type. Distributed SQL can be run onEntrySet
,KeySet
,LocalKeySet
orValues
. It is an optional attribute withVALUE
being the default. Supported values areENTRY, `KEY
,LOCAL_KEY
andVALUE
; -
distributed-sql
:指定 SQL 语句的 where 子句。这是必需属性。 -
distributed-sql
: Specifies the where clause of the sql statement. It is a mandatory attribute.
Hazelcast Outbound Channel Adapter
Hazelcast 出站通道适配器侦听其定义的通道,并将传入的消息写入到相关的分布式缓存。它期望其中一个:cache
、cache-expression
或 HazelcastHeaders.CACHE_NAME
用于分布式对象定义。支持的分布式对象是:IMap
、MultiMap
、ReplicatedMap
、IList
、ISet
、IQueue
和 ITopic
。
The Hazelcast Outbound Channel Adapter listens to its defined channel and writes incoming messages to related distributed cache.
It expects one of cache
, cache-expression
or HazelcastHeaders.CACHE_NAME
for distributed object definition.
Supported Distributed Objects are: IMap
, MultiMap
, ReplicatedMap
, IList
, ISet
, IQueue
and ITopic
.
-
Java
-
XML
@Bean
public MessageChannel distributedMapChannel() {
return new DirectChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
return hzInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hzInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
handler.setDistributedObject(distributedMap());
handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
handler.setExtractPayload(true);
return handler;
}
<int-hazelcast:outbound-channel-adapter channel="mapChannel"
cache-expression="headers['CACHE_HEADER']"
key-expression="payload.key"
extract-payload="true"/>
它需要以下属性:
It requires the following attributes :
-
channel
:指定发送消息的通道; -
channel
: Specifies the channel to which messages are sent; -
cache
:指定分布式对象引用。可选项; -
cache
: Specifies the distributed object reference. Optional; -
cache-expression
:通过 Spring 表达式语言 (SpEL) 指定分布式对象。可选项; -
cache-expression
: Specifies the distributed object via Spring Expression Language (SpEL). Optional; -
key-expression
:通过 Spring 表达式语言 (SpEL) 指定键值对的键。仅对于IMap
, `MultiMap`和 `ReplicatedMap`分布式数据结构而言是可选项和必需。 -
key-expression
: Specifies the key of a key-value pair via Spring Expression Language (SpEL). Optional and required for only forIMap
,MultiMap
andReplicatedMap
distributed data structures. -
extract-payload
:指定是否发送整个消息或仅发送有效负载。可选项,默认值为true
。如果为 true,则仅有效负载将被写入分布式对象。否则,通过转换消息头和有效负载,将写入整个消息。 -
extract-payload
: Specifies whether to send the whole message or just the payload. Optional attribute withtrue
being the default. If it is true, just the payload will be written to the distributed object. Otherwise, the whole message will be written by converting both message headers and payload.
通过在头中设置分布式对象名称,可以经由同一信道将消息写入不同的分布式对象。如果未定义 cache
或 cache-expression
特性,必须在请求 Message
中设置 HazelcastHeaders.CACHE_NAME
头。
By setting distributed object name in the header, messages can be written to different distributed objects via same channel.
If cache
or cache-expression
attributes are not defined, a HazelcastHeaders.CACHE_NAME
header has to be set in a request Message
.
Hazelcast Leader Election
如果需要领导人选举(例如,对于仅一个节点应该接收消息的高可用消息使用者而言),可以使用基于 Hazelcast 的 LeaderInitiator
:
If leader election is needed (e.g. for highly available message consumer where only one node should receive messages) a Hazelcast-based LeaderInitiator
can be used:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public LeaderInitiator initiator() {
return new LeaderInitiator(hazelcastInstance());
}
当一个节点被选为领导人时,它将向所有应用程序侦听器发送 OnGrantedEvent
。
When a node is elected leader it will send an OnGrantedEvent
to all application listeners.
Hazelcast Message Store
对于分布式消息状态管理(例如,对于持久性 QueueChannel
或跟踪 Aggregator
消息组),提供了 HazelcastMessageStore
实现:
For distributed messaging state management, for example for a persistent QueueChannel
or tracking Aggregator
message groups, the HazelcastMessageStore
implementation is provided:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public MessageGroupStore messageStore() {
return new HazelcastMessageStore(hazelcastInstance());
}
默认情况下,SPRING_INTEGRATION_MESSAGE_STORE
IMap
用于将消息和组存储为键/值。任何自定义 IMap
都可以提供给 HazelcastMessageStore
。
By default, the SPRING_INTEGRATION_MESSAGE_STORE
IMap
is used to store messages and groups as a key/value.
Any custom IMap
can be provided to the HazelcastMessageStore
.
Hazelcast Metadata Store
实现一个 ListenableMetadataStore
可用,它使用后备 Hazelcast IMap
。默认情况下,将使用名称为 SPRING_INTEGRATION_METADATA_STORE
的映射,该名称可以自定义。
An implementation of a ListenableMetadataStore
is available using a backing Hazelcast IMap
.
The default map is created with a name SPRING_INTEGRATION_METADATA_STORE
which can be customized.
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public MetadataStore metadataStore() {
return new HazelcastMetadataStore(hazelcastInstance());
}
HazelcastMetadataStore
实现 ListenableMetadataStore
,允许你注册你自己的 MetadataStoreListener
类型的侦听器,以通过 addListener(MetadataStoreListener 回调)
侦听事件。
The HazelcastMetadataStore
implements ListenableMetadataStore
which allows you to register your own listeners of type MetadataStoreListener
to listen for events via addListener(MetadataStoreListener callback)
.
Hazelcast Lock Registry
可以使用后备 Hazelcast 分布式 ILock
支持实现一个 LockRegistry
:
An implementation of a LockRegistry
is available using a backing Hazelcast distributed ILock
support:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public LockRegistry lockRegistry() {
return new HazelcastLockRegistry(hazelcastInstance());
}
与共享 MessageGroupStore
(例如,Aggregator
存储管理)配合使用时,HazelcastLockRegistry
可用于跨多个应用程序实例提供此功能,以便一次只允许一个实例处理组。
When used with a shared MessageGroupStore
(e.g. Aggregator
store management), the HazelcastLockRegistry
can be used to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
对于所有分布式操作,必须在 |
For all the distributed operations the CP Subsystem must be enabled on |
Message Channels with Hazelcast
Hazelcast IQueue
和 ITopic
分布式对象本质上是消息原语,可以与 Spring Integration 核心组件配合使用,而无需在此 Hazelcast 模块中实现额外的实现。
The Hazelcast IQueue
and ITopic
distributed objects are, essentially, messaging primitives and can be use with Spring Integration core components without extra implementations in this Hazelcast module.
QueueChannel
可由任何 java.util.Queue
提供,包括已提到的 Hazelcast 分布式 IQueue
:
The QueueChannel
can be supplied by any java.util.Queue
, including the mentioned Hazelcast distributed IQueue
:
@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
return new QueueChannel(hazelcastInstance.Message<?>>getQueue("springIntegrationQueue"));
}
将此配置放置在应用程序的 Hazelcast 集群中的多个节点上,将使 QueueChannel
得到分布,并且只有一个节点将能够从该 IQueue
轮询单个 Message
。其工作方式类似于 PollableJmsChannel
,PollableKafkaChannel
或 PollableAmqpChannel
。
Placing this config on several nodes in Hazelcast cluster of the application, will make the QueueChannel
as distributed and only one node will be able to poll a single Message
from that IQueue
.
This works similar to PollableJmsChannel
, PollableKafkaChannel
or PollableAmqpChannel
.
如果生产者端不是 Spring 集成应用程序,则无法配置 QueueChannel
,因此使用普通 Hazelcast IQueue
API 产生数据。在这种情况下,QueueChannel
方法在消费者端是错误的:必须改用 Inbound Channel Adapter 解决方案:
If the producer side is not a Spring Integration application, there is no way to configure a QueueChannel
, and therefore the plain Hazelcast IQueue
API is used to produce the data.
In this case, the QueueChannel
approach is wrong on the consumer side: an Inbound Channel Adapter solution must be used instead:
@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
return hazelcastInstance.getQueue("springIntegrationQueue");
}
@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
return myStringHzQueue::poll;
}
Hazelcast 中的 ITopic
抽象与 JMS 中的 Topic
具有相似的语义:所有订户都收到发布的消息。使用一对简单的 MessageChannel
bean,此机制作为开箱即用的功能得到支持:
The ITopic
abstraction in Hazelcast has similar semantics to a Topic
in JMS: all subscribers receive published messages.
With a pair of simple MessageChannel
beans this mechanism is supported as an out-of-the-box feature:
@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
MessageChannel fromHazelcastTopicChannel) {
ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
return topic;
}
@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
return new FixedSubscriberChannel(springIntegrationTopic::publish);
}
@Bean
public MessageChannel fromHazelcastTopicChannel() {
return new DirectChannel();
}
FixedSubscriberChannel
是 DirectChannel
的优化变体,在初始化时需要一个 MessageHandler
。由于 MessageHandler
是一个函数式接口,因此可以为 handleMessage
方法提供一个简单的 lambda。当消息发送到 publishToHazelcastTopicChannel
时,它只是发布到 Hazelcast ITopic
。com.hazelcast.topic.MessageListener
也是一个函数式接口,因此可以为 ITopic#addMessageListener
提供一个 lambda。因此,fromHazelcastTopicChannel
订阅者将使用所有发送到所述 ITopic
的消息。
The FixedSubscriberChannel
is an optimized variant of DirectChannel
, which requires a MessageHandler
on initialization.
Since the MessageHandler
is a functional interface a simple lambda for the handleMessage
method can be provided.
When a message is sent to the publishToHazelcastTopicChannel
it is just published onto the Hazelcast ITopic
.
The com.hazelcast.topic.MessageListener
is a functional interface, too, hence a lambda to the ITopic#addMessageListener
can be provided.
So, a subscriber to the fromHazelcastTopicChannel
will consume all messages sent to the mentioned ITopic
.
可以将 ExecutorChannel
与 IExecutorService
一起提供。例如,通过相应的配置,可以实现集群范围内单例:
An ExecutorChannel
can be supplied with an IExecutorService
.
For example, with respective configuration a cluster-wide singleton can be achieved:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(
new Config()
.addExecutorConfig(new ExecutorConfig()
.setName("singletonExecutor")
.setPoolSize(1)));
}
@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}