Redis 支持
Spring Integration 2.1 引入了对 Redis 的支持:“一个开源的高级键值存储”。
此支持以基于 Redis 的 MessageStore
以及发布-订阅消息适配器的形式提供,这些适配器通过 Redis 的 PUBLISH
、SUBSCRIBE
和 UNSUBSCRIBE
命令提供支持。
你需要将此依赖项包含到你的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:{project-version}"
你还需要包含 Redis 客户端依赖,例如 Lettuce。 要下载、安装和运行 Redis,请参阅 Redis 文档。
连接到 Redis
要开始与 Redis 交互,首先需要连接到它。
Spring Integration 使用另一个 Spring 项目 Spring Data Redis 提供的支持,该项目提供了典型的 Spring 构造:ConnectionFactory
和 Template
。
这些抽象简化了与多个 Redis 客户端 Java API 的集成。
目前,Spring Data Redis 支持 Jedis 和 Lettuce。
使用 RedisConnectionFactory
要连接到 Redis,你可以使用 RedisConnectionFactory
接口的其中一个实现。
以下列表显示了接口定义:
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
以下示例显示了如何在 Java 中创建 LettuceConnectionFactory
:
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
以下示例显示了如何在 Spring 的 XML 配置中创建 LettuceConnectionFactory
:
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
RedisConnectionFactory
的实现提供了一组属性,例如端口和主机,你可以根据需要进行设置。
获得 RedisConnectionFactory
实例后,你可以创建 RedisTemplate
实例并使用 RedisConnectionFactory
注入它。
使用 RedisTemplate
与 Spring 中的其他模板类(例如 JdbcTemplate
和 JmsTemplate
)一样,RedisTemplate
是一个简化 Redis 数据访问代码的辅助类。
有关 RedisTemplate
及其变体(例如 StringRedisTemplate
)的更多信息,请参阅 Spring Data Redis 文档。
以下示例显示了如何在 Java 中创建 RedisTemplate
实例:
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
以下示例显示了如何在 Spring 的 XML 配置中创建 RedisTemplate
实例:
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
使用 Redis 进行消息传递
如 [the introduction] 中所述,Redis 通过其 PUBLISH
、SUBSCRIBE
和 UNSUBSCRIBE
命令提供发布-订阅消息传递支持。
与 JMS 和 AMQP 一样,Spring Integration 提供消息通道和适配器,用于通过 Redis 发送和接收消息。
Redis 发布/订阅通道
与 JMS 类似,在某些情况下,生产者和消费者都旨在成为同一应用程序的一部分,并在同一进程中运行。 你可以通过使用一对入站和出站通道适配器来实现此目的。 但是,与 Spring Integration 的 JMS 支持一样,有一种更简单的方法来解决此用例。 你可以创建一个发布-订阅通道,如以下示例所示:
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
publish-subscribe-channel
的行为与主 Spring Integration 命名空间中的普通 <publish-subscribe-channel/>
元素非常相似。
它可以被任何端点的 input-channel
和 output-channel
属性引用。
不同之处在于,此通道由 Redis 主题名称支持:一个由 topic-name
属性指定的 String
值。
但是,与 JMS 不同,此主题不必提前创建,甚至不必由 Redis 自动创建。
在 Redis 中,主题是简单的 String
值,充当地址的角色。
生产者和消费者可以使用相同的 String
值作为其主题名称进行通信。
简单订阅此通道意味着生产者和消费者端点之间可以进行异步发布-订阅消息传递。
但是,与通过在简单 Spring Integration <channel/>
元素中添加 <queue/>
元素创建的异步消息通道不同,消息不存储在内存队列中。
相反,这些消息通过 Redis 传递,这使你可以依赖其持久性和集群支持以及与其他非 Java 平台的互操作性。
Redis 入站通道适配器
Redis 入站通道适配器 (RedisInboundChannelAdapter
) 以与其他入站适配器相同的方式将传入的 Redis 消息适配为 Spring 消息。
它接收特定于平台的消息(在本例中为 Redis)并使用 MessageConverter
策略将它们转换为 Spring 消息。
以下示例显示了如何配置 Redis 入站通道适配器:
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
前面的示例显示了 Redis 入站通道适配器的简单但完整的配置。
请注意,前面的配置依赖于熟悉的 Spring 自动发现某些 bean 的范例。
在本例中,redisConnectionFactory
被隐式注入到适配器中。
你可以通过使用 connection-factory
属性来显式指定它。
此外,请注意,前面的配置使用自定义 MessageConverter
注入适配器。
该方法类似于 JMS,其中 MessageConverter
实例用于在 Redis 消息和 Spring Integration 消息有效负载之间进行转换。
默认是 SimpleMessageConverter
。
入站适配器可以订阅多个主题名称,因此 topics
属性中有一组逗号分隔的值。
从 3.0 版本开始,入站适配器除了现有的 topics
属性外,现在还具有 topic-patterns
属性。
此属性包含一组逗号分隔的 Redis 主题模式。
有关 Redis 发布-订阅的更多信息,请参阅 Redis Pub/Sub。
入站适配器可以使用 RedisSerializer
反序列化 Redis 消息的主体。
<int-redis:inbound-channel-adapter>
的 serializer
属性可以设置为空字符串,这会导致 RedisSerializer
属性的值为 null
。
在这种情况下,Redis 消息的原始 byte[]
主体作为消息有效负载提供。
从 5.0 版本开始,你可以通过使用 <int-redis:inbound-channel-adapter>
的 task-executor
属性向入站适配器提供 Executor
实例。
此外,收到的 Spring Integration 消息现在具有 RedisHeaders.MESSAGE_SOURCE
标头,以指示已发布消息的来源:主题或模式。
你可以将此用于下游路由逻辑。
Redis 出站通道适配器
Redis 出站通道适配器以与其他出站适配器相同的方式将传出的 Spring Integration 消息适配为 Redis 消息。
它接收 Spring Integration 消息并使用 MessageConverter
策略将它们转换为特定于平台的消息(在本例中为 Redis)。
以下示例显示了如何配置 Redis 出站通道适配器:
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
该配置与 Redis 入站通道适配器并行。
适配器被隐式注入 RedisConnectionFactory
,该工厂的 bean 名称定义为 redisConnectionFactory
。
此示例还包括可选的(和自定义的)MessageConverter
(testConverter
bean)。
从 Spring Integration 3.0 开始,<int-redis:outbound-channel-adapter>
提供了 topic
属性的替代方案:你可以使用 topic-expression
属性在运行时确定消息的 Redis 主题。
这些属性是互斥的。
Redis 队列入站通道适配器
Spring Integration 3.0 引入了队列入站通道适配器,用于从 Redis 列表中"`弹出`"消息。 默认情况下,它使用"`右弹出`",但你可以将其配置为使用"`左弹出`"。 适配器是消息驱动的。 它使用内部侦听器线程,不使用轮询器。
以下列表显示了 queue-inbound-channel-adapter
的所有可用属性:
<int-redis:queue-inbound-channel-adapter id="" [id="CO1-1"]1
channel="" [id="CO1-2"]2
auto-startup="" [id="CO1-3"]3
phase="" [id="CO1-4"]4
connection-factory="" [id="CO1-5"]5
queue="" [id="CO1-6"]6
error-channel="" [id="CO1-7"]7
serializer="" [id="CO1-8"]8
receive-timeout="" [id="CO1-9"]9
recovery-interval="" [id="CO1-10"]10
expect-message="" [id="CO1-11"]11
task-executor="" [id="CO1-12"]12
right-pop=""/> [id="CO1-13"]13
<1> 组件 bean 名称。 如果你不提供 `channel` 属性,则会创建一个 `DirectChannel`,并以此 `id` 属性作为 bean 名称在应用程序上下文中注册。 在这种情况下,端点本身以 `id` 加 `.adapter` 的 bean 名称注册。 (如果 bean 名称是 `thing1`,则端点注册为 `thing1.adapter`。) <1> 此端点将 `Message` 实例发送到的 `MessageChannel`。 <1> 一个 `SmartLifecycle` 属性,用于指定此端点是否应在应用程序上下文启动后自动启动。 它默认为 `true`。 <1> 一个 `SmartLifecycle` 属性,用于指定此端点启动的阶段。 它默认为 `0`。 <1> 对 `RedisConnectionFactory` bean 的引用。 它默认为 `redisConnectionFactory`。 <1> 执行基于队列的“pop”操作以获取 Redis 消息的 Redis 列表的名称。 <1> 当从端点的侦听任务收到异常时,将 `ErrorMessage` 实例发送到的 `MessageChannel`。 默认情况下,底层 `MessagePublishingErrorHandler` 使用应用程序上下文中的默认 `errorChannel`。 <1> `RedisSerializer` bean 引用。 它可以是空字符串,表示“无序列化器”。 在这种情况下,入站 Redis 消息的原始 `byte[]` 作为 `Message` 有效负载发送到 `channel`。 默认情况下,它是 `JdkSerializationRedisSerializer`。 <1> “pop”操作等待来自队列的 Redis 消息的超时时间(以毫秒为单位)。 默认值为 1 秒。 <1> 侦听器任务在“pop”操作发生异常后应休眠的时间(以毫秒为单位),然后重新启动侦听器任务。 <1> 指定此端点是否期望 Redis 队列中的数据包含整个 `Message` 实例。 如果此属性设置为 `true`,则 `serializer` 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。 其默认值为 `false`。 <1> 对 Spring `TaskExecutor`(或标准 JDK 1.5+ `Executor`)bean 的引用。 它用于底层侦听任务。 它默认为 `SimpleAsyncTaskExecutor`。 <1> 指定此端点是应使用“右弹出”(当为 `true` 时)还是“左弹出”(当为 `false` 时)从 Redis 列表中读取消息。 如果为 `true`,则与默认 Redis 队列出站通道适配器一起使用时,Redis 列表充当 `FIFO` 队列。 将其设置为 `false` 以与使用“右推”写入列表的软件一起使用,或实现堆栈式消息顺序。 其默认值为 `true`。 从 4.3 版本开始。
task-executor
必须配置为具有多个线程进行处理;否则,当 RedisQueueMessageDrivenEndpoint
尝试在错误后重新启动侦听器任务时,可能会发生死锁。
errorChannel
可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露给可能的死锁情况。
有关可能的 TaskExecutor
实现,请参阅 Spring Framework 参考手册。
Redis 队列出站通道适配器
Spring Integration 3.0 引入了队列出站通道适配器,用于从 Spring Integration 消息"`推`"到 Redis 列表。
默认情况下,它使用"`左推`",但你可以将其配置为使用"`右推`"。
以下列表显示了 Redis queue-outbound-channel-adapter
的所有可用属性:
<int-redis:queue-outbound-channel-adapter id="" [id="CO2-1"]1
channel="" [id="CO2-2"]2
connection-factory="" [id="CO2-3"]3
queue="" [id="CO2-4"]4
queue-expression="" [id="CO2-5"]5
serializer="" [id="CO2-6"]6
extract-payload="" [id="CO2-7"]7
left-push=""/> [id="CO2-8"]8
<1> 组件 bean 名称。 如果你不提供 `channel` 属性,则会创建一个 `DirectChannel`,并以此 `id` 属性作为 bean 名称在应用程序上下文中注册。 在这种情况下,端点以 `id` 加 `.adapter` 的 bean 名称注册。 (如果 bean 名称是 `thing1`,则端点注册为 `thing1.adapter`。) <1> 此端点从中接收 `Message` 实例的 `MessageChannel`。 <1> 对 `RedisConnectionFactory` bean 的引用。 它默认为 `redisConnectionFactory`。 <1> 执行基于队列的“push”操作以发送 Redis 消息的 Redis 列表的名称。 此属性与 `queue-expression` 互斥。 <1> 一个 SpEL `Expression`,用于确定 Redis 列表的名称。 它在运行时使用传入的 `Message` 作为 `#root` 变量。 此属性与 `queue` 互斥。 <1> 一个 `RedisSerializer` bean 引用。 它默认为 `JdkSerializationRedisSerializer`。 但是,对于 `String` 有效负载,如果未提供 `serializer` 引用,则使用 `StringRedisSerializer`。 <1> 指定此端点是应仅将有效负载还是将整个 `Message` 发送到 Redis 队列。 它默认为 `true`。 <1> 指定此端点是应使用“左推”(当为 `true` 时)还是“右推”(当为 `false` 时)将消息写入 Redis 列表。 如果为 `true`,则与默认 Redis 队列入站通道适配器一起使用时,Redis 列表充当 `FIFO` 队列。 将其设置为 `false` 以与使用“左弹出”从列表读取的软件一起使用,或实现堆栈式消息顺序。 它默认为 `true`。 从 4.3 版本开始。
Redis 应用程序事件
从 Spring Integration 3.0 开始,Redis 模块提供了 IntegrationEvent
的实现,它反过来又是 org.springframework.context.ApplicationEvent
。
RedisExceptionEvent
封装了 Redis 操作中的异常(端点是事件的“源”)。
例如,<int-redis:queue-inbound-channel-adapter/>
在捕获 BoundListOperations.rightPop
操作中的异常后发出这些事件。
异常可以是任何通用的 org.springframework.data.redis.RedisSystemException
或 org.springframework.data.redis.RedisConnectionFailureException
。
使用 <int-event:inbound-channel-adapter/>
处理这些事件对于确定后台 Redis 任务的问题并采取管理操作很有用。
Redis 消息存储
如 Enterprise Integration Patterns (EIP) 一书中所述,消息存储 允许你持久化消息。
这在处理具有缓冲消息能力(聚合器、重新排序器等)的组件时非常有用,尤其是在关注可靠性时。
在 Spring Integration 中,MessageStore
策略也为 凭证检查 模式奠定了基础,该模式也在 EIP 中进行了描述。
Spring Integration 的 Redis 模块提供了 RedisMessageStore
。
以下示例显示了如何将其与聚合器一起使用:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
前面的示例是一个 bean 配置,它需要 RedisConnectionFactory
作为构造函数参数。
默认情况下,RedisMessageStore
使用 Java 序列化来序列化消息。
但是,如果你想使用不同的序列化技术(例如 JSON),你可以通过设置 RedisMessageStore
的 valueSerializer
属性来提供你自己的序列化器。
从 4.3.10 版本开始,框架为 Message
实例和 MessageHeaders
实例提供了 Jackson 序列化器和反序列化器实现——分别是 MessageJacksonDeserializer
和 MessageHeadersJacksonSerializer
。
它们必须使用 ObjectMapper
的 SimpleModule
选项进行配置。
此外,你应该在 ObjectMapper
上设置 enableDefaultTyping
以添加每个序列化复杂对象的类型信息(如果你信任源)。
然后,在反序列化期间使用该类型信息。
框架提供了一个名为 JacksonJsonUtils.messagingAwareMapper()
的实用方法,该方法已提供所有前面提到的属性和序列化器。
此实用方法带有 trustedPackages
参数,用于限制反序列化的 Java 包,以避免安全漏洞。
默认受信任的包包括:java.util
、java.lang
、org.springframework.messaging.support
、org.springframework.integration.support
、org.springframework.integration.message
、org.springframework.integration.store
。
要在 RedisMessageStore
中管理 JSON 序列化,你必须按照以下示例进行配置:
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
从 4.3.12 版本开始,RedisMessageStore
支持 prefix
选项,以允许区分同一 Redis 服务器上的存储实例。
Redis 通道消息存储
RedisMessageStore
redis-message-store 将每个组作为单个键(组 ID)下的值进行维护。
虽然你可以使用它来支持 QueueChannel
进行持久化,但为此目的提供了一个专门的 RedisChannelMessageStore
(从 4.0 版本开始)。
此存储为每个通道使用 LIST
,发送消息时使用 LPUSH
,接收消息时使用 RPOP
。
默认情况下,此存储也使用 JDK 序列化,但你可以修改值序列化器,如 redis-message-store。
我们建议使用此存储支持通道,而不是使用通用的 RedisMessageStore
。
以下示例定义了一个 Redis 消息存储,并在带有队列的通道中使用它:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
用于存储数据的键的形式为:<storeBeanName>:<channelId>
(在前面的示例中,redisMessageStore:somePersistentQueueChannel
)。
此外,还提供了一个子类 RedisChannelPriorityMessageStore
。
当你将其与 QueueChannel
一起使用时,消息以(FIFO)优先级顺序接收。
它使用标准的 IntegrationMessageHeaderAccessor.PRIORITY
标头并支持优先级值(0 - 9
)。
其他优先级(和没有优先级)的消息在任何具有优先级的消息之后以 FIFO 顺序检索。
这些存储仅实现 BasicMessageGroupStore
,不实现 MessageGroupStore
。
它们只能用于支持 QueueChannel
等情况。
Redis 元数据存储
Spring Integration 3.0 引入了新的基于 Redis 的 MetadataStore
(参见 Metadata Store)实现。
你可以使用 RedisMetadataStore
在应用程序重启后维护 MetadataStore
的状态。
你可以将此新的 MetadataStore
实现与以下适配器一起使用:
要指示这些适配器使用新的 RedisMetadataStore
,请声明一个名为 metadataStore
的 Spring bean。
Feed 入站通道适配器和 feed 入站通道适配器都会自动选择并使用声明的 RedisMetadataStore
。
以下示例显示了如何声明此类 bean:
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
RedisMetadataStore
由 RedisProperties
支持。
与其交互使用 BoundHashOperations
,这反过来又需要整个 Properties
存储的 key
。
在 MetadataStore
的情况下,此 key
扮演区域的角色,这在分布式环境中很有用,当多个应用程序使用同一个 Redis 服务器时。
默认情况下,此 key
的值为 MetaData
。
从 4.0 版本开始,此存储实现了 ConcurrentMetadataStore
,允许它在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。
你不能将 RedisMetadataStore.replace()
(例如,在 AbstractPersistentAcceptOnceFileListFilter
中)与 Redis 集群一起使用,因为目前不支持用于原子性的 WATCH
命令。
Redis 存储入站通道适配器
Redis 存储入站通道适配器是一个轮询消费者,它从 Redis 集合中读取数据并将其作为 Message
有效负载发送。
以下示例显示了如何配置 Redis 存储入站通道适配器:
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
前面的示例显示了如何使用 store-inbound-channel-adapter
元素配置 Redis 存储入站通道适配器,并为各种属性提供值,例如:
-
key
或key-expression
:所用集合的键名。 -
collection-type
:此适配器支持的集合类型的枚举。 支持的集合是LIST
、SET
、ZSET
、PROPERTIES
和MAP
。 -
connection-factory
:对o.s.data.redis.connection.RedisConnectionFactory
实例的引用。 -
redis-template
:对o.s.data.redis.core.RedisTemplate
实例的引用。 -
所有其他入站适配器共有的其他属性(例如“channel”)。
你不能同时设置 |
默认情况下,适配器使用 StringRedisTemplate
。
这使用 StringRedisSerializer
实例用于键、值、哈希键和哈希值。
如果你的 Redis 存储包含使用其他技术序列化的对象,你必须提供配置有适当序列化器的 RedisTemplate
。
例如,如果存储是使用 Redis 存储出站适配器写入的,并且其 extract-payload-elements
设置为 false
,则你必须提供配置如下的 RedisTemplate
:
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
<property name="keySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
<property name="hashKeySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
</bean>
RedisTemplate
使用 String
序列化器用于键和哈希键,并使用默认的 JDK 序列化序列化器用于值和哈希值。
因为它具有 key
的字面值,所以前面的示例相对简单和静态。
有时,你可能需要根据某些条件在运行时更改键的值。
为此,请改用 key-expression
,其中提供的表达式可以是任何有效的 SpEL 表达式。
此外,你可能希望对从 Redis 集合中读取的成功处理的数据执行一些后处理。
例如,你可能希望在处理后移动或删除该值。
你可以通过使用 Spring Integration 2.2 中添加的事务同步功能来实现此目的。
以下示例使用 key-expression
和事务同步:
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
你可以通过使用 transactional
元素将轮询器声明为事务性的。
此元素可以引用一个真正的事务管理器(例如,如果你的流的某个其他部分调用 JDBC)。
如果你没有“真正的”事务,你可以使用 o.s.i.transaction.PseudoTransactionManager
,它是 Spring PlatformTransactionManager
的实现,并且在没有实际事务时可以使用 Redis 适配器的事务同步功能。
这不会使 Redis 活动本身具有事务性。 它允许在成功(提交)之前或之后或在失败(回滚)之后进行操作同步。
一旦你的轮询器是事务性的,你就可以在 transactional
元素上设置 o.s.i.transaction.TransactionSynchronizationFactory
的实例。
TransactionSynchronizationFactory
创建 TransactionSynchronization
的实例。
为了方便起见,我们公开了一个默认的基于 SpEL 的 TransactionSynchronizationFactory
,它允许你配置 SpEL 表达式,并将其执行与事务协调(同步)。
支持提交前、提交后和回滚后的表达式,以及发送评估结果(如果有)的通道(每种事件一个)。
对于每个子元素,你可以指定 expression
和 channel
属性。
如果只存在 channel
属性,则收到的消息作为特定同步场景的一部分发送到那里。
如果只存在 expression
属性,并且表达式的结果是非空值,则会生成一个以结果作为有效负载的消息,并将其发送到默认通道 (NullChannel
) 并出现在日志中(在 DEBUG
级别)。
如果你希望评估结果发送到特定通道,请添加 channel
属性。
如果表达式的结果为 null 或 void,则不生成消息。
RedisStoreMessageSource
添加了一个 store
属性,其中包含绑定到事务 IntegrationResourceHolder
的 RedisStore
实例,可以从 TransactionSynchronizationProcessor
实现访问该实例。
有关事务同步的更多信息,请参阅 事务同步。
RedisStore 出站通道适配器
RedisStore 出站通道适配器允许你将消息有效负载写入 Redis 集合,如以下示例所示:
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
前面的配置使用 store-inbound-channel-adapter
元素配置 Redis 存储出站通道适配器。
它为各种属性提供值,例如:
-
key
或key-expression
:所用集合的键名。 -
extract-payload-elements
:如果设置为true
(默认值)且有效负载是“多值”对象(即Collection
或Map
)的实例,则使用“addAll
”和“putAll
”语义存储。 否则,如果设置为false
,则无论其类型如何,有效负载都存储为单个条目。 如果有效负载不是“多值”对象的实例,则忽略此属性的值,并且有效负载始终存储为单个条目。 -
collection-type
:此适配器支持的Collection
类型的枚举。 支持的集合是LIST
、SET
、ZSET
、PROPERTIES
和MAP
。 -
map-key-expression
:返回所存储条目键名的 SpEL 表达式。 仅当collection-type
是MAP
或PROPERTIES
且“extract-payload-elements”为 false 时适用。 -
connection-factory
:对o.s.data.redis.connection.RedisConnectionFactory
实例的引用。 -
redis-template
:对o.s.data.redis.core.RedisTemplate
实例的引用。 -
所有其他入站适配器共有的其他属性(例如“channel”)。
你不能同时设置 |
默认情况下,适配器使用 StringRedisTemplate
。
这使用 StringRedisSerializer
实例用于键、值、哈希键和哈希值。
但是,如果 extract-payload-elements
设置为 false
,则将使用 RedisTemplate
,其中包含 StringRedisSerializer
实例用于键和哈希键,以及 JdkSerializationRedisSerializer
实例用于值和哈希值。
使用 JDK 序列化器时,重要的是要了解 Java 序列化用于所有值,无论该值是否实际是集合。
如果你需要对值的序列化进行更多控制,请考虑提供自己的 RedisTemplate
,而不是依赖这些默认值。
因为它具有 key
和其他属性的字面值,所以前面的示例相对简单和静态。
有时,你可能需要根据某些条件在运行时动态更改这些值。
为此,请使用它们的 -expression
等效项(key-expression
、map-key-expression
等),其中提供的表达式可以是任何有效的 SpEL 表达式。
Redis 出站命令网关
Spring Integration 4.0 引入了 Redis 命令网关,允许你使用通用的 RedisConnection#execute
方法执行任何标准 Redis 命令。
以下列表显示了 Redis 出站网关的可用属性:
<int-redis:outbound-gateway
request-channel="" [id="CO3-1"]1
reply-channel="" [id="CO3-2"]2
requires-reply="" [id="CO3-3"]3
reply-timeout="" [id="CO3-4"]4
connection-factory="" [id="CO3-5"]5
redis-template="" [id="CO3-6"]6
arguments-serializer="" [id="CO3-7"]7
command-expression="" [id="CO3-8"]8
argument-expressions="" [id="CO3-9"]9
use-command-variable="" [id="CO3-10"]10
arguments-strategy="" /> [id="CO3-11"]11
<1> 此端点从中接收 `Message` 实例的 `MessageChannel`。 <1> 此端点发送回复 `Message` 实例的 `MessageChannel`。 <1> 指定此出站网关是否必须返回非空值。 它默认为 `true`。 当 Redis 返回 `null` 值时,将抛出 `ReplyRequiredException`。 <1> 等待发送回复消息的超时时间(以毫秒为单位)。 它通常适用于基于队列的受限回复通道。 <1> 对 `RedisConnectionFactory` bean 的引用。 它默认为 `redisConnectionFactory`。 它与“redis-template”属性互斥。 <1> 对 `RedisTemplate` bean 的引用。 它与“connection-factory”属性互斥。 <1> 对 `org.springframework.data.redis.serializer.RedisSerializer` 实例的引用。 如果需要,它用于将每个命令参数序列化为 byte[]。 <1> 返回命令键的 SpEL 表达式。 它默认为 `redis_command` 消息头。 它不能评估为 `null`。 <1> 逗号分隔的 SpEL 表达式,它们被评估为命令参数。 与 `arguments-strategy` 属性互斥。 如果你不提供任何属性,则 `payload` 用作命令参数。 参数表达式可以评估为“null”以支持可变数量的参数。 <1> 一个 `boolean` 标志,用于指定当配置 `argument-expressions` 时,评估的 Redis 命令字符串是否在 `o.s.i.redis.outbound.ExpressionArgumentsStrategy` 中的表达式评估上下文中作为 `#cmd` 变量可用。 否则,此属性将被忽略。 <1> 对 `o.s.i.redis.outbound.ArgumentsStrategy` 实例的引用。 它与 `argument-expressions` 属性互斥。 如果你不提供任何属性,则 `payload` 用作命令参数。
你可以使用 <int-redis:outbound-gateway>
作为通用组件来执行任何所需的 Redis 操作。
以下示例显示了如何从 Redis 原子数获取递增值:
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
Message
有效负载应该有一个名称 redisCounter
,这可能由 org.springframework.data.redis.support.atomic.RedisAtomicInteger
bean 定义提供。
RedisConnection#execute
方法的返回类型是泛型 Object
。
实际结果取决于命令类型。
例如,MGET
返回 List<byte[]>
。
有关命令、其参数和结果类型的更多信息,请参阅 Redis 规范。
Redis 队列出站网关
Spring Integration 引入了 Redis 队列出站网关来执行请求和回复场景。
它将对话 UUID
推送到提供的 queue
,将以该 UUID
作为其键的值推送到 Redis 列表,并等待来自 Redis 列表的回复,其键为 UUID
加 .reply
。
每次交互都使用不同的 UUID。
以下列表显示了 Redis 出站网关的可用属性:
<int-redis:queue-outbound-gateway
request-channel="" [id="CO4-1"]1
reply-channel="" [id="CO4-2"]2
requires-reply="" [id="CO4-3"]3
reply-timeout="" [id="CO4-4"]4
connection-factory="" [id="CO4-5"]5
queue="" [id="CO4-6"]6
order="" [id="CO4-7"]7
serializer="" [id="CO4-8"]8
extract-payload=""/> [id="CO4-9"]9
<1> 此端点从中接收 `Message` 实例的 `MessageChannel`。 <1> 此端点发送回复 `Message` 实例的 `MessageChannel`。 <1> 指定此出站网关是否必须返回非空值。 此值默认为 `false`。 否则,当 Redis 返回 `null` 值时,将抛出 `ReplyRequiredException`。 <1> 等待发送回复消息的超时时间(以毫秒为单位)。 它通常适用于基于队列的受限回复通道。 <1> 对 `RedisConnectionFactory` bean 的引用。 它默认为 `redisConnectionFactory`。 它与“redis-template”属性互斥。 <1> 出站网关发送对话 `UUID` 的 Redis 列表的名称。 <1> 当注册多个网关时,此出站网关的顺序。 <1> `RedisSerializer` bean 引用。 它可以是空字符串,表示“无序列化器”。 在这种情况下,入站 Redis 消息的原始 `byte[]` 作为 `Message` 有效负载发送到 `channel`。 默认情况下,它是 `JdkSerializationRedisSerializer`。 <1> 指定此端点是否期望 Redis 队列中的数据包含整个 `Message` 实例。 如果此属性设置为 `true`,则 `serializer` 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。
Redis 队列入站网关
Spring Integration 4.1 引入了 Redis 队列入站网关来执行请求和回复场景。
它从提供的 queue
弹出对话 UUID
,从 Redis 列表弹出以该 UUID
作为其键的值,并将回复推送到键为 UUID
加 .reply
的 Redis 列表。
以下列表显示了 Redis 队列入站网关的可用属性:
<int-redis:queue-inbound-gateway
request-channel="" [id="CO5-1"]1
reply-channel="" [id="CO5-2"]2
executor="" [id="CO5-3"]3
reply-timeout="" [id="CO5-4"]4
connection-factory="" [id="CO5-5"]5
queue="" [id="CO5-6"]6
order="" [id="CO5-7"]7
serializer="" [id="CO5-8"]8
receive-timeout="" [id="CO5-9"]9
expect-message="" [id="CO5-10"]10
recovery-interval=""/> [id="CO5-11"]11
<1> 此端点将从 Redis 数据创建的 `Message` 实例发送到的 `MessageChannel`。 <1> 此端点从中等待回复 `Message` 实例的 `MessageChannel`。 可选 - `replyChannel` 标头仍在使用中。 <1> 对 Spring `TaskExecutor`(或标准 JDK `Executor`)bean 的引用。 它用于底层侦听任务。 它默认为 `SimpleAsyncTaskExecutor`。 <1> 等待发送回复消息的超时时间(以毫秒为单位)。 它通常适用于基于队列的受限回复通道。 <1> 对 `RedisConnectionFactory` bean 的引用。 它默认为 `redisConnectionFactory`。 它与“redis-template”属性互斥。 <1> 对话 `UUID` 的 Redis 列表的名称。 <1> 当注册多个网关时,此入站网关的顺序。 <1> `RedisSerializer` bean 引用。 它可以是空字符串,表示“无序列化器”。 在这种情况下,入站 Redis 消息的原始 `byte[]` 作为 `Message` 有效负载发送到 `channel`。 它默认为 `JdkSerializationRedisSerializer`。 (请注意,在 4.3 版本之前,它默认是 `StringRedisSerializer`。 要恢复该行为,请提供对 `StringRedisSerializer` 的引用)。 <1> 等待接收消息的超时时间(以毫秒为单位)。 它通常适用于基于队列的受限请求通道。 <1> 指定此端点是否期望 Redis 队列中的数据包含整个 `Message` 实例。 如果此属性设置为 `true`,则 `serializer` 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。 <1> 侦听器任务在“右弹出”操作发生异常后应休眠的时间(以毫秒为单位),然后重新启动侦听器任务。
task-executor
必须配置为具有多个线程进行处理;否则,当 RedisQueueMessageDrivenEndpoint
尝试在错误后重新启动侦听器任务时,可能会发生死锁。
errorChannel
可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露给可能的死锁情况。
有关可能的 TaskExecutor
实现,请参阅 Spring Framework 参考手册。
Redis Stream 出站通道适配器
Spring Integration 5.4 引入了 Reactive Redis Stream 出站通道适配器,用于将消息有效负载写入 Redis Stream。
出站通道适配器使用 ReactiveStreamOperations.add(…)
将 Record
添加到流中。
以下示例显示了如何将 Java 配置和 Service 类用于 Redis Stream 出站通道适配器。
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); [id="CO6-1"]1
reactiveStreamMessageHandler.setSerializationContext(serializationContext); [id="CO6-2"]2
reactiveStreamMessageHandler.setHashMapper(hashMapper); [id="CO6-3"]3
reactiveStreamMessageHandler.setExtractPayload(true); [id="CO6-4"]4
return reactiveStreamMessageHandler;
}
<1> 使用 `ReactiveRedisConnectionFactory` 和流名称构造 `ReactiveRedisStreamMessageHandler` 实例以添加记录。 另一个构造函数变体基于 SpEL 表达式,用于根据请求消息评估流键。 <1> 设置用于在添加到流之前序列化记录键和值的 `RedisSerializationContext`。 <1> 设置 `HashMapper`,它提供 Java 类型和 Redis 哈希/映射之间的契约。 <1> 如果为 `true`,通道适配器将从请求消息中提取有效负载以添加流记录。 或者使用整个消息作为值。 它默认为 `true`。
从 6.5 版本开始,ReactiveRedisStreamMessageHandler
提供了一个 setAddOptionsFunction(Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction)
,用于基于请求消息构建内部 ReactiveStreamOperations.add(Record<K, ?> record, XAddOptions xAddOptions)
调用的 RedisStreamCommands.XAddOptions
。
Redis Stream 入站通道适配器
Spring Integration 5.4 引入了 Reactive Stream 入站通道适配器,用于从 Redis Stream 读取消息。
入站通道适配器根据自动确认标志使用 StreamReceiver.receive(…)
或 StreamReceiver.receiveAutoAck()
从 Redis Stream 读取记录。
以下示例显示了如何将 Java 配置用于 Redis Stream 入站通道适配器。
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); [id="CO7-1"]1
messageProducer.setStreamReceiverOptions( [id="CO7-2"]2
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); [id="CO7-3"]3
messageProducer.setAutoAck(false); [id="CO7-4"]4
messageProducer.setCreateConsumerGroup(true); [id="CO7-5"]5
messageProducer.setConsumerGroup("my-group"); [id="CO7-6"]6
messageProducer.setConsumerName("my-consumer"); [id="CO7-7"]7
messageProducer.setOutputChannel(fromRedisStreamChannel); [id="CO7-8"]8
messageProducer.setReadOffset(ReadOffset.latest()); [id="CO7-9"]9
messageProducer.extractPayload(true); [id="CO7-10"]10
return messageProducer;
}
<1> 使用 `ReactiveRedisConnectionFactory` 和流键构造 `ReactiveRedisStreamMessageProducer` 实例以读取记录。 <1> 一个 `StreamReceiver.StreamReceiverOptions`,用于使用反应式基础设施消费 redis 流。 <1> 一个 `SmartLifecycle` 属性,用于指定此端点是否应在应用程序上下文启动后自动启动。 它默认为 `true`。 如果为 `false`,则应手动启动 `RedisStreamMessageProducer` (`messageProducer.start()`)。 <1> 如果为 `false`,则收到的消息不会自动确认。 消息的确认将推迟到客户端消费消息。 它默认为 `true`。 <1> 如果为 `true`,则将创建一个消费者组。 在创建消费者组期间,也将创建流(如果尚不存在)。 消费者组跟踪消息传递并区分消费者。 它默认为 `false`。 <1> 设置消费者组名称。 它默认为定义的 bean 名称。 <1> 设置消费者名称。 从组 `my-group` 读取消息作为 `my-consumer`。 <1> 此端点将消息发送到的消息通道。 <1> 定义读取消息的偏移量。 它默认为 `ReadOffset.latest()`。 <1> 如果为 `true`,通道适配器将从 `Record` 中提取有效负载值。 否则,整个 `Record` 用作有效负载。 它默认为 `true`。
如果 autoAck
设置为 false
,则 Redis Stream 中的 Record
不会被 Redis 驱动程序自动确认,而是将 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
标头添加到要生成的消息中,其值为 SimpleAcknowledgment
实例。
当业务逻辑完成基于此类记录的消息时,目标集成流负责调用其 acknowledge()
回调。
即使在反序列化期间发生异常并配置了 errorChannel
,也需要类似的逻辑。
因此,目标错误处理程序必须决定是否确认或拒绝此类失败消息。
除了 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
之外,ReactiveRedisStreamMessageProducer
还会将这些标头填充到要生成的消息中:RedisHeaders.STREAM_KEY
、RedisHeaders.STREAM_MESSAGE_ID
、RedisHeaders.CONSUMER_GROUP
和 RedisHeaders.CONSUMER
。
从 5.5 版本开始,你可以在 ReactiveRedisStreamMessageProducer
上显式配置 StreamReceiver.StreamReceiverOptionsBuilder
选项,包括新引入的 onErrorResume
函数,如果 Redis Stream 消费者在发生反序列化错误时应继续轮询,则需要此函数。
默认函数将消息发送到错误通道(如果提供),并可能对失败消息进行确认,如上所述。
所有这些 StreamReceiver.StreamReceiverOptionsBuilder
都与外部提供的 StreamReceiver.StreamReceiverOptions
互斥。
Redis 锁注册表
Spring Integration 4.0 引入了 RedisLockRegistry
。
某些组件(例如,聚合器和重新排序器)使用从 LockRegistry
实例获取的锁来确保一次只有一个线程操作一个组。
DefaultLockRegistry
在单个组件中执行此功能。
你现在可以在这些组件上配置外部锁注册表。
当与共享 MessageGroupStore
一起使用时,你可以使用 RedisLockRegistry
在多个应用程序实例之间提供此功能,这样一次只有一个实例可以操作该组。
当本地线程释放锁时,另一个本地线程通常可以立即获取锁。 如果锁由使用不同注册表实例的线程释放,则获取锁可能需要长达 100 毫秒。
为了避免“挂起”的锁(当服务器发生故障时),此注册表中的锁在默认的 60 秒后过期,但你可以在注册表上配置此值。 锁通常持有时间要短得多。
由于键可能会过期,尝试解锁过期锁会导致抛出异常。 但是,受此类锁保护的资源可能已受到损害,因此此类异常应被视为严重异常。 你应该将过期时间设置得足够大,以防止这种情况发生,但要设置得足够低,以便在服务器故障后能在合理的时间内恢复锁。
从 5.0 版本开始,RedisLockRegistry
实现了 ExpirableLockRegistry
,它会删除最后获取时间超过 age
且当前未锁定的锁。
从 5.5.6 版本开始,RedisLockRegistry
支持通过 RedisLockRegistry.setCacheCapacity()
自动清理 RedisLockRegistry.locks
中的 redisLocks 缓存。
有关更多信息,请参阅其 JavaDocs。
从 5.5.13 版本开始,RedisLockRegistry
公开了一个 setRedisLockType(RedisLockType)
选项,用于确定 Redis 锁获取应以何种模式发生:
-
RedisLockType.SPIN_LOCK
- 通过定期循环(100 毫秒)检查是否可以获取锁来获取锁。 默认。 -
RedisLockType.PUB_SUB_LOCK
- 通过 redis 发布-订阅订阅获取锁。
发布-订阅是首选模式——客户端 Redis 服务器之间的网络通信更少,性能更高——当订阅在另一个进程中收到解锁通知时,锁会立即获取。 但是,Redis 不支持主/副本连接中的发布-订阅(例如在 AWS ElastiCache 环境中),因此选择忙等待模式作为默认值,以使注册表在任何环境中都能工作。
从 6.4 版本开始,RedisLockRegistry.RedisLock.unlock()
方法不再抛出 IllegalStateException
,而是抛出 ConcurrentModificationException
,如果锁的所有权已过期。
从 6.4 版本开始,添加了 RedisLockRegistry.setRenewalTaskScheduler()
来配置用于定期续订锁的调度程序。
设置后,锁将在成功获取锁后每 1/3
的过期时间自动续订,直到解锁或 redis 键被删除。