Redis 支持

Spring Integration 2.1 引入了对 Redis 的支持:“一个开源的高级键值存储”。 此支持以基于 Redis 的 MessageStore 以及发布-订阅消息适配器的形式提供,这些适配器通过 Redis 的 PUBLISHSUBSCRIBEUNSUBSCRIBE 命令提供支持。 你需要将此依赖项包含到你的项目中:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-redis</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:{project-version}"

你还需要包含 Redis 客户端依赖,例如 Lettuce。 要下载、安装和运行 Redis,请参阅 Redis 文档

连接到 Redis

要开始与 Redis 交互,首先需要连接到它。 Spring Integration 使用另一个 Spring 项目 Spring Data Redis 提供的支持,该项目提供了典型的 Spring 构造:ConnectionFactoryTemplate。 这些抽象简化了与多个 Redis 客户端 Java API 的集成。 目前,Spring Data Redis 支持 JedisLettuce

使用 RedisConnectionFactory

要连接到 Redis,你可以使用 RedisConnectionFactory 接口的其中一个实现。 以下列表显示了接口定义:

public interface RedisConnectionFactory extends PersistenceExceptionTranslator {

    /**
     * Provides a suitable connection for interacting with Redis.
     * @return connection for interacting with Redis.
     */
    RedisConnection getConnection();
}

以下示例显示了如何在 Java 中创建 LettuceConnectionFactory

LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();

以下示例显示了如何在 Spring 的 XML 配置中创建 LettuceConnectionFactory

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>

RedisConnectionFactory 的实现提供了一组属性,例如端口和主机,你可以根据需要进行设置。 获得 RedisConnectionFactory 实例后,你可以创建 RedisTemplate 实例并使用 RedisConnectionFactory 注入它。

使用 RedisTemplate

与 Spring 中的其他模板类(例如 JdbcTemplateJmsTemplate)一样,RedisTemplate 是一个简化 Redis 数据访问代码的辅助类。 有关 RedisTemplate 及其变体(例如 StringRedisTemplate)的更多信息,请参阅 Spring Data Redis 文档

以下示例显示了如何在 Java 中创建 RedisTemplate 实例:

RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);

以下示例显示了如何在 Spring 的 XML 配置中创建 RedisTemplate 实例:

<bean id="redisTemplate"
         class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

使用 Redis 进行消息传递

[the introduction] 中所述,Redis 通过其 PUBLISHSUBSCRIBEUNSUBSCRIBE 命令提供发布-订阅消息传递支持。 与 JMS 和 AMQP 一样,Spring Integration 提供消息通道和适配器,用于通过 Redis 发送和接收消息。

Redis 发布/订阅通道

与 JMS 类似,在某些情况下,生产者和消费者都旨在成为同一应用程序的一部分,并在同一进程中运行。 你可以通过使用一对入站和出站通道适配器来实现此目的。 但是,与 Spring Integration 的 JMS 支持一样,有一种更简单的方法来解决此用例。 你可以创建一个发布-订阅通道,如以下示例所示:

<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>

publish-subscribe-channel 的行为与主 Spring Integration 命名空间中的普通 <publish-subscribe-channel/> 元素非常相似。 它可以被任何端点的 input-channeloutput-channel 属性引用。 不同之处在于,此通道由 Redis 主题名称支持:一个由 topic-name 属性指定的 String 值。 但是,与 JMS 不同,此主题不必提前创建,甚至不必由 Redis 自动创建。 在 Redis 中,主题是简单的 String 值,充当地址的角色。 生产者和消费者可以使用相同的 String 值作为其主题名称进行通信。 简单订阅此通道意味着生产者和消费者端点之间可以进行异步发布-订阅消息传递。 但是,与通过在简单 Spring Integration <channel/> 元素中添加 <queue/> 元素创建的异步消息通道不同,消息不存储在内存队列中。 相反,这些消息通过 Redis 传递,这使你可以依赖其持久性和集群支持以及与其他非 Java 平台的互操作性。

Redis 入站通道适配器

Redis 入站通道适配器 (RedisInboundChannelAdapter) 以与其他入站适配器相同的方式将传入的 Redis 消息适配为 Spring 消息。 它接收特定于平台的消息(在本例中为 Redis)并使用 MessageConverter 策略将它们转换为 Spring 消息。 以下示例显示了如何配置 Redis 入站通道适配器:

<int-redis:inbound-channel-adapter id="redisAdapter"
       topics="thing1, thing2"
       channel="receiveChannel"
       error-channel="testErrorChannel"
       message-converter="testConverter" />

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

前面的示例显示了 Redis 入站通道适配器的简单但完整的配置。 请注意,前面的配置依赖于熟悉的 Spring 自动发现某些 bean 的范例。 在本例中,redisConnectionFactory 被隐式注入到适配器中。 你可以通过使用 connection-factory 属性来显式指定它。

此外,请注意,前面的配置使用自定义 MessageConverter 注入适配器。 该方法类似于 JMS,其中 MessageConverter 实例用于在 Redis 消息和 Spring Integration 消息有效负载之间进行转换。 默认是 SimpleMessageConverter

入站适配器可以订阅多个主题名称,因此 topics 属性中有一组逗号分隔的值。

从 3.0 版本开始,入站适配器除了现有的 topics 属性外,现在还具有 topic-patterns 属性。 此属性包含一组逗号分隔的 Redis 主题模式。 有关 Redis 发布-订阅的更多信息,请参阅 Redis Pub/Sub

入站适配器可以使用 RedisSerializer 反序列化 Redis 消息的主体。 <int-redis:inbound-channel-adapter>serializer 属性可以设置为空字符串,这会导致 RedisSerializer 属性的值为 null。 在这种情况下,Redis 消息的原始 byte[] 主体作为消息有效负载提供。

从 5.0 版本开始,你可以通过使用 <int-redis:inbound-channel-adapter>task-executor 属性向入站适配器提供 Executor 实例。 此外,收到的 Spring Integration 消息现在具有 RedisHeaders.MESSAGE_SOURCE 标头,以指示已发布消息的来源:主题或模式。 你可以将此用于下游路由逻辑。

Redis 出站通道适配器

Redis 出站通道适配器以与其他出站适配器相同的方式将传出的 Spring Integration 消息适配为 Redis 消息。 它接收 Spring Integration 消息并使用 MessageConverter 策略将它们转换为特定于平台的消息(在本例中为 Redis)。 以下示例显示了如何配置 Redis 出站通道适配器:

<int-redis:outbound-channel-adapter id="outboundAdapter"
    channel="sendChannel"
    topic="thing1"
    message-converter="testConverter"/>

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379"/>
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

该配置与 Redis 入站通道适配器并行。 适配器被隐式注入 RedisConnectionFactory,该工厂的 bean 名称定义为 redisConnectionFactory。 此示例还包括可选的(和自定义的)MessageConvertertestConverter bean)。

从 Spring Integration 3.0 开始,<int-redis:outbound-channel-adapter> 提供了 topic 属性的替代方案:你可以使用 topic-expression 属性在运行时确定消息的 Redis 主题。 这些属性是互斥的。

Redis 队列入站通道适配器

Spring Integration 3.0 引入了队列入站通道适配器,用于从 Redis 列表中"`弹出`"消息。 默认情况下,它使用"`右弹出`",但你可以将其配置为使用"`左弹出`"。 适配器是消息驱动的。 它使用内部侦听器线程,不使用轮询器。

以下列表显示了 queue-inbound-channel-adapter 的所有可用属性:

<int-redis:queue-inbound-channel-adapter id=""  [id="CO1-1"]1
                    channel=""  [id="CO1-2"]2
                    auto-startup=""  [id="CO1-3"]3
                    phase=""  [id="CO1-4"]4
                    connection-factory=""  [id="CO1-5"]5
                    queue=""  [id="CO1-6"]6
                    error-channel=""  [id="CO1-7"]7
                    serializer=""  [id="CO1-8"]8
                    receive-timeout=""  [id="CO1-9"]9
                    recovery-interval=""  [id="CO1-10"]10
                    expect-message=""  [id="CO1-11"]11
                    task-executor=""  [id="CO1-12"]12
                    right-pop=""/>  [id="CO1-13"]13
 <1>  组件 bean 名称。
如果你不提供 `channel` 属性,则会创建一个 `DirectChannel`,并以此 `id` 属性作为 bean 名称在应用程序上下文中注册。
在这种情况下,端点本身以 `id` 加 `.adapter` 的 bean 名称注册。
(如果 bean 名称是 `thing1`,则端点注册为 `thing1.adapter`。)
 <1>  此端点将 `Message` 实例发送到的 `MessageChannel`。
 <1>  一个 `SmartLifecycle` 属性,用于指定此端点是否应在应用程序上下文启动后自动启动。
它默认为 `true`。
 <1>  一个 `SmartLifecycle` 属性,用于指定此端点启动的阶段。
它默认为 `0`。
 <1>  对 `RedisConnectionFactory` bean 的引用。
它默认为 `redisConnectionFactory`。
 <1>  执行基于队列的“pop”操作以获取 Redis 消息的 Redis 列表的名称。
 <1>  当从端点的侦听任务收到异常时,将 `ErrorMessage` 实例发送到的 `MessageChannel`。
默认情况下,底层 `MessagePublishingErrorHandler` 使用应用程序上下文中的默认 `errorChannel`。
 <1>  `RedisSerializer` bean 引用。
它可以是空字符串,表示“无序列化器”。
在这种情况下,入站 Redis 消息的原始 `byte[]` 作为 `Message` 有效负载发送到 `channel`。
默认情况下,它是 `JdkSerializationRedisSerializer`。
 <1>  “pop”操作等待来自队列的 Redis 消息的超时时间(以毫秒为单位)。
默认值为 1 秒。
 <1>  侦听器任务在“pop”操作发生异常后应休眠的时间(以毫秒为单位),然后重新启动侦听器任务。
 <1>  指定此端点是否期望 Redis 队列中的数据包含整个 `Message` 实例。
如果此属性设置为 `true`,则 `serializer` 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。
其默认值为 `false`。
 <1>  对 Spring `TaskExecutor`(或标准 JDK 1.5+ `Executor`)bean 的引用。
它用于底层侦听任务。
它默认为 `SimpleAsyncTaskExecutor`。
 <1>  指定此端点是应使用“右弹出”(当为 `true` 时)还是“左弹出”(当为 `false` 时)从 Redis 列表中读取消息。
如果为 `true`,则与默认 Redis 队列出站通道适配器一起使用时,Redis 列表充当 `FIFO` 队列。
将其设置为 `false` 以与使用“右推”写入列表的软件一起使用,或实现堆栈式消息顺序。
其默认值为 `true`。
从 4.3 版本开始。

task-executor 必须配置为具有多个线程进行处理;否则,当 RedisQueueMessageDrivenEndpoint 尝试在错误后重新启动侦听器任务时,可能会发生死锁。 errorChannel 可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露给可能的死锁情况。 有关可能的 TaskExecutor 实现,请参阅 Spring Framework 参考手册

Redis 队列出站通道适配器

Spring Integration 3.0 引入了队列出站通道适配器,用于从 Spring Integration 消息"`推`"到 Redis 列表。 默认情况下,它使用"`左推`",但你可以将其配置为使用"`右推`"。 以下列表显示了 Redis queue-outbound-channel-adapter 的所有可用属性:

<int-redis:queue-outbound-channel-adapter id=""  [id="CO2-1"]1
                    channel=""  [id="CO2-2"]2
                    connection-factory=""  [id="CO2-3"]3
                    queue=""  [id="CO2-4"]4
                    queue-expression=""  [id="CO2-5"]5
                    serializer=""  [id="CO2-6"]6
                    extract-payload=""  [id="CO2-7"]7
                    left-push=""/>  [id="CO2-8"]8
 <1>  组件 bean 名称。
如果你不提供 `channel` 属性,则会创建一个 `DirectChannel`,并以此 `id` 属性作为 bean 名称在应用程序上下文中注册。
在这种情况下,端点以 `id` 加 `.adapter` 的 bean 名称注册。
(如果 bean 名称是 `thing1`,则端点注册为 `thing1.adapter`。)
 <1>  此端点从中接收 `Message` 实例的 `MessageChannel`。
 <1>  对 `RedisConnectionFactory` bean 的引用。
它默认为 `redisConnectionFactory`。
 <1>  执行基于队列的“push”操作以发送 Redis 消息的 Redis 列表的名称。
此属性与 `queue-expression` 互斥。
 <1>  一个 SpEL `Expression`,用于确定 Redis 列表的名称。
它在运行时使用传入的 `Message` 作为 `#root` 变量。
此属性与 `queue` 互斥。
 <1>  一个 `RedisSerializer` bean 引用。
它默认为 `JdkSerializationRedisSerializer`。
但是,对于 `String` 有效负载,如果未提供 `serializer` 引用,则使用 `StringRedisSerializer`。
 <1>  指定此端点是应仅将有效负载还是将整个 `Message` 发送到 Redis 队列。
它默认为 `true`。
 <1>  指定此端点是应使用“左推”(当为 `true` 时)还是“右推”(当为 `false` 时)将消息写入 Redis 列表。
如果为 `true`,则与默认 Redis 队列入站通道适配器一起使用时,Redis 列表充当 `FIFO` 队列。
将其设置为 `false` 以与使用“左弹出”从列表读取的软件一起使用,或实现堆栈式消息顺序。
它默认为 `true`。
从 4.3 版本开始。

Redis 应用程序事件

从 Spring Integration 3.0 开始,Redis 模块提供了 IntegrationEvent 的实现,它反过来又是 org.springframework.context.ApplicationEventRedisExceptionEvent 封装了 Redis 操作中的异常(端点是事件的“源”)。 例如,<int-redis:queue-inbound-channel-adapter/> 在捕获 BoundListOperations.rightPop 操作中的异常后发出这些事件。 异常可以是任何通用的 org.springframework.data.redis.RedisSystemExceptionorg.springframework.data.redis.RedisConnectionFailureException。 使用 <int-event:inbound-channel-adapter/> 处理这些事件对于确定后台 Redis 任务的问题并采取管理操作很有用。

Redis 消息存储

Enterprise Integration Patterns (EIP) 一书中所述,消息存储 允许你持久化消息。 这在处理具有缓冲消息能力(聚合器、重新排序器等)的组件时非常有用,尤其是在关注可靠性时。 在 Spring Integration 中,MessageStore 策略也为 凭证检查 模式奠定了基础,该模式也在 EIP 中进行了描述。

Spring Integration 的 Redis 模块提供了 RedisMessageStore。 以下示例显示了如何将其与聚合器一起使用:

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
    <constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="redisMessageStore"/>

前面的示例是一个 bean 配置,它需要 RedisConnectionFactory 作为构造函数参数。

默认情况下,RedisMessageStore 使用 Java 序列化来序列化消息。 但是,如果你想使用不同的序列化技术(例如 JSON),你可以通过设置 RedisMessageStorevalueSerializer 属性来提供你自己的序列化器。

从 4.3.10 版本开始,框架为 Message 实例和 MessageHeaders 实例提供了 Jackson 序列化器和反序列化器实现——分别是 MessageJacksonDeserializerMessageHeadersJacksonSerializer。 它们必须使用 ObjectMapperSimpleModule 选项进行配置。 此外,你应该在 ObjectMapper 上设置 enableDefaultTyping 以添加每个序列化复杂对象的类型信息(如果你信任源)。 然后,在反序列化期间使用该类型信息。 框架提供了一个名为 JacksonJsonUtils.messagingAwareMapper() 的实用方法,该方法已提供所有前面提到的属性和序列化器。 此实用方法带有 trustedPackages 参数,用于限制反序列化的 Java 包,以避免安全漏洞。 默认受信任的包包括:java.utiljava.langorg.springframework.messaging.supportorg.springframework.integration.supportorg.springframework.integration.messageorg.springframework.integration.store。 要在 RedisMessageStore 中管理 JSON 序列化,你必须按照以下示例进行配置:

RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);

从 4.3.12 版本开始,RedisMessageStore 支持 prefix 选项,以允许区分同一 Redis 服务器上的存储实例。

Redis 通道消息存储

RedisMessageStore redis-message-store 将每个组作为单个键(组 ID)下的值进行维护。 虽然你可以使用它来支持 QueueChannel 进行持久化,但为此目的提供了一个专门的 RedisChannelMessageStore(从 4.0 版本开始)。 此存储为每个通道使用 LIST,发送消息时使用 LPUSH,接收消息时使用 RPOP。 默认情况下,此存储也使用 JDK 序列化,但你可以修改值序列化器,如 redis-message-store

我们建议使用此存储支持通道,而不是使用通用的 RedisMessageStore。 以下示例定义了一个 Redis 消息存储,并在带有队列的通道中使用它:

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
	<constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="redisMessageStore"/>
<int:channel>

用于存储数据的键的形式为:<storeBeanName>:<channelId>(在前面的示例中,redisMessageStore:somePersistentQueueChannel)。

此外,还提供了一个子类 RedisChannelPriorityMessageStore。 当你将其与 QueueChannel 一起使用时,消息以(FIFO)优先级顺序接收。 它使用标准的 IntegrationMessageHeaderAccessor.PRIORITY 标头并支持优先级值(0 - 9)。 其他优先级(和没有优先级)的消息在任何具有优先级的消息之后以 FIFO 顺序检索。

这些存储仅实现 BasicMessageGroupStore,不实现 MessageGroupStore。 它们只能用于支持 QueueChannel 等情况。

Redis 元数据存储

Spring Integration 3.0 引入了新的基于 Redis 的 MetadataStore(参见 Metadata Store)实现。 你可以使用 RedisMetadataStore 在应用程序重启后维护 MetadataStore 的状态。 你可以将此新的 MetadataStore 实现与以下适配器一起使用:

要指示这些适配器使用新的 RedisMetadataStore,请声明一个名为 metadataStore 的 Spring bean。 Feed 入站通道适配器和 feed 入站通道适配器都会自动选择并使用声明的 RedisMetadataStore。 以下示例显示了如何声明此类 bean:

<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
    <constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

RedisMetadataStoreRedisProperties 支持。 与其交互使用 BoundHashOperations,这反过来又需要整个 Properties 存储的 key。 在 MetadataStore 的情况下,此 key 扮演区域的角色,这在分布式环境中很有用,当多个应用程序使用同一个 Redis 服务器时。 默认情况下,此 key 的值为 MetaData

从 4.0 版本开始,此存储实现了 ConcurrentMetadataStore,允许它在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。

你不能将 RedisMetadataStore.replace()(例如,在 AbstractPersistentAcceptOnceFileListFilter 中)与 Redis 集群一起使用,因为目前不支持用于原子性的 WATCH 命令。

Redis 存储入站通道适配器

Redis 存储入站通道适配器是一个轮询消费者,它从 Redis 集合中读取数据并将其作为 Message 有效负载发送。 以下示例显示了如何配置 Redis 存储入站通道适配器:

<int-redis:store-inbound-channel-adapter id="listAdapter"
    connection-factory="redisConnectionFactory"
    key="myCollection"
    channel="redisChannel"
    collection-type="LIST" >
    <int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>

前面的示例显示了如何使用 store-inbound-channel-adapter 元素配置 Redis 存储入站通道适配器,并为各种属性提供值,例如:

  • keykey-expression:所用集合的键名。

  • collection-type:此适配器支持的集合类型的枚举。 支持的集合是 LISTSETZSETPROPERTIESMAP

  • connection-factory:对 o.s.data.redis.connection.RedisConnectionFactory 实例的引用。

  • redis-template:对 o.s.data.redis.core.RedisTemplate 实例的引用。

  • 所有其他入站适配器共有的其他属性(例如“channel”)。

你不能同时设置 redis-templateconnection-factory

默认情况下,适配器使用 StringRedisTemplate。 这使用 StringRedisSerializer 实例用于键、值、哈希键和哈希值。 如果你的 Redis 存储包含使用其他技术序列化的对象,你必须提供配置有适当序列化器的 RedisTemplate。 例如,如果存储是使用 Redis 存储出站适配器写入的,并且其 extract-payload-elements 设置为 false,则你必须提供配置如下的 RedisTemplate

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
    <property name="keySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    </property>
    <property name="hashKeySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    </property>
</bean>

RedisTemplate 使用 String 序列化器用于键和哈希键,并使用默认的 JDK 序列化序列化器用于值和哈希值。

因为它具有 key 的字面值,所以前面的示例相对简单和静态。 有时,你可能需要根据某些条件在运行时更改键的值。 为此,请改用 key-expression,其中提供的表达式可以是任何有效的 SpEL 表达式。

此外,你可能希望对从 Redis 集合中读取的成功处理的数据执行一些后处理。 例如,你可能希望在处理后移动或删除该值。 你可以通过使用 Spring Integration 2.2 中添加的事务同步功能来实现此目的。 以下示例使用 key-expression 和事务同步:

<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
        connection-factory="redisConnectionFactory"
        key-expression="'presidents'"
        channel="otherRedisChannel"
        auto-startup="false"
        collection-type="ZSET">
            <int:poller fixed-rate="1000" max-messages-per-poll="2">
                <int:transactional synchronization-factory="syncFactory"/>
            </int:poller>
</int-redis:store-inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
	<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

你可以通过使用 transactional 元素将轮询器声明为事务性的。 此元素可以引用一个真正的事务管理器(例如,如果你的流的某个其他部分调用 JDBC)。 如果你没有“真正的”事务,你可以使用 o.s.i.transaction.PseudoTransactionManager,它是 Spring PlatformTransactionManager 的实现,并且在没有实际事务时可以使用 Redis 适配器的事务同步功能。

这不会使 Redis 活动本身具有事务性。 它允许在成功(提交)之前或之后或在失败(回滚)之后进行操作同步。

一旦你的轮询器是事务性的,你就可以在 transactional 元素上设置 o.s.i.transaction.TransactionSynchronizationFactory 的实例。 TransactionSynchronizationFactory 创建 TransactionSynchronization 的实例。 为了方便起见,我们公开了一个默认的基于 SpEL 的 TransactionSynchronizationFactory,它允许你配置 SpEL 表达式,并将其执行与事务协调(同步)。 支持提交前、提交后和回滚后的表达式,以及发送评估结果(如果有)的通道(每种事件一个)。 对于每个子元素,你可以指定 expressionchannel 属性。 如果只存在 channel 属性,则收到的消息作为特定同步场景的一部分发送到那里。 如果只存在 expression 属性,并且表达式的结果是非空值,则会生成一个以结果作为有效负载的消息,并将其发送到默认通道 (NullChannel) 并出现在日志中(在 DEBUG 级别)。 如果你希望评估结果发送到特定通道,请添加 channel 属性。 如果表达式的结果为 null 或 void,则不生成消息。

RedisStoreMessageSource 添加了一个 store 属性,其中包含绑定到事务 IntegrationResourceHolderRedisStore 实例,可以从 TransactionSynchronizationProcessor 实现访问该实例。

有关事务同步的更多信息,请参阅 事务同步

RedisStore 出站通道适配器

RedisStore 出站通道适配器允许你将消息有效负载写入 Redis 集合,如以下示例所示:

<int-redis:store-outbound-channel-adapter id="redisListAdapter"
          collection-type="LIST"
          channel="requestChannel"
          key="myCollection" />

前面的配置使用 store-inbound-channel-adapter 元素配置 Redis 存储出站通道适配器。 它为各种属性提供值,例如:

  • keykey-expression:所用集合的键名。

  • extract-payload-elements:如果设置为 true(默认值)且有效负载是“多值”对象(即 CollectionMap)的实例,则使用“addAll”和“putAll”语义存储。 否则,如果设置为 false,则无论其类型如何,有效负载都存储为单个条目。 如果有效负载不是“多值”对象的实例,则忽略此属性的值,并且有效负载始终存储为单个条目。

  • collection-type:此适配器支持的 Collection 类型的枚举。 支持的集合是 LISTSETZSETPROPERTIESMAP

  • map-key-expression:返回所存储条目键名的 SpEL 表达式。 仅当 collection-typeMAPPROPERTIES 且“extract-payload-elements”为 false 时适用。

  • connection-factory:对 o.s.data.redis.connection.RedisConnectionFactory 实例的引用。

  • redis-template:对 o.s.data.redis.core.RedisTemplate 实例的引用。

  • 所有其他入站适配器共有的其他属性(例如“channel”)。

你不能同时设置 redis-templateconnection-factory

默认情况下,适配器使用 StringRedisTemplate。 这使用 StringRedisSerializer 实例用于键、值、哈希键和哈希值。 但是,如果 extract-payload-elements 设置为 false,则将使用 RedisTemplate,其中包含 StringRedisSerializer 实例用于键和哈希键,以及 JdkSerializationRedisSerializer 实例用于值和哈希值。 使用 JDK 序列化器时,重要的是要了解 Java 序列化用于所有值,无论该值是否实际是集合。 如果你需要对值的序列化进行更多控制,请考虑提供自己的 RedisTemplate,而不是依赖这些默认值。

因为它具有 key 和其他属性的字面值,所以前面的示例相对简单和静态。 有时,你可能需要根据某些条件在运行时动态更改这些值。 为此,请使用它们的 -expression 等效项(key-expressionmap-key-expression 等),其中提供的表达式可以是任何有效的 SpEL 表达式。

Redis 出站命令网关

Spring Integration 4.0 引入了 Redis 命令网关,允许你使用通用的 RedisConnection#execute 方法执行任何标准 Redis 命令。 以下列表显示了 Redis 出站网关的可用属性:

<int-redis:outbound-gateway
        request-channel=""  [id="CO3-1"]1
        reply-channel=""  [id="CO3-2"]2
        requires-reply=""  [id="CO3-3"]3
        reply-timeout=""  [id="CO3-4"]4
        connection-factory=""  [id="CO3-5"]5
        redis-template=""  [id="CO3-6"]6
        arguments-serializer=""  [id="CO3-7"]7
        command-expression=""  [id="CO3-8"]8
        argument-expressions=""  [id="CO3-9"]9
        use-command-variable=""  [id="CO3-10"]10
        arguments-strategy="" /> [id="CO3-11"]11
 <1>  此端点从中接收 `Message` 实例的 `MessageChannel`。
 <1>  此端点发送回复 `Message` 实例的 `MessageChannel`。
 <1>  指定此出站网关是否必须返回非空值。
它默认为 `true`。
当 Redis 返回 `null` 值时,将抛出 `ReplyRequiredException`。
 <1>  等待发送回复消息的超时时间(以毫秒为单位)。
它通常适用于基于队列的受限回复通道。
 <1>  对 `RedisConnectionFactory` bean 的引用。
它默认为 `redisConnectionFactory`。
它与“redis-template”属性互斥。
 <1>  对 `RedisTemplate` bean 的引用。
它与“connection-factory”属性互斥。
 <1>  对 `org.springframework.data.redis.serializer.RedisSerializer` 实例的引用。
如果需要,它用于将每个命令参数序列化为 byte[]。
 <1>  返回命令键的 SpEL 表达式。
它默认为 `redis_command` 消息头。
它不能评估为 `null`。
 <1>  逗号分隔的 SpEL 表达式,它们被评估为命令参数。
与 `arguments-strategy` 属性互斥。
如果你不提供任何属性,则 `payload` 用作命令参数。
参数表达式可以评估为“null”以支持可变数量的参数。
 <1>  一个 `boolean` 标志,用于指定当配置 `argument-expressions` 时,评估的 Redis 命令字符串是否在 `o.s.i.redis.outbound.ExpressionArgumentsStrategy` 中的表达式评估上下文中作为 `#cmd` 变量可用。
否则,此属性将被忽略。
 <1>  对 `o.s.i.redis.outbound.ArgumentsStrategy` 实例的引用。
它与 `argument-expressions` 属性互斥。
如果你不提供任何属性,则 `payload` 用作命令参数。

你可以使用 <int-redis:outbound-gateway> 作为通用组件来执行任何所需的 Redis 操作。 以下示例显示了如何从 Redis 原子数获取递增值:

<int-redis:outbound-gateway request-channel="requestChannel"
    reply-channel="replyChannel"
    command-expression="'INCR'"/>

Message 有效负载应该有一个名称 redisCounter,这可能由 org.springframework.data.redis.support.atomic.RedisAtomicInteger bean 定义提供。

RedisConnection#execute 方法的返回类型是泛型 Object。 实际结果取决于命令类型。 例如,MGET 返回 List<byte[]>。 有关命令、其参数和结果类型的更多信息,请参阅 Redis 规范

Redis 队列出站网关

Spring Integration 引入了 Redis 队列出站网关来执行请求和回复场景。 它将对话 UUID 推送到提供的 queue,将以该 UUID 作为其键的值推送到 Redis 列表,并等待来自 Redis 列表的回复,其键为 UUID.reply。 每次交互都使用不同的 UUID。 以下列表显示了 Redis 出站网关的可用属性:

<int-redis:queue-outbound-gateway
        request-channel=""  [id="CO4-1"]1
        reply-channel=""  [id="CO4-2"]2
        requires-reply=""  [id="CO4-3"]3
        reply-timeout=""  [id="CO4-4"]4
        connection-factory=""  [id="CO4-5"]5
        queue=""  [id="CO4-6"]6
        order=""  [id="CO4-7"]7
        serializer=""  [id="CO4-8"]8
        extract-payload=""/>  [id="CO4-9"]9
 <1>  此端点从中接收 `Message` 实例的 `MessageChannel`。
 <1>  此端点发送回复 `Message` 实例的 `MessageChannel`。
 <1>  指定此出站网关是否必须返回非空值。
此值默认为 `false`。
否则,当 Redis 返回 `null` 值时,将抛出 `ReplyRequiredException`。
 <1>  等待发送回复消息的超时时间(以毫秒为单位)。
它通常适用于基于队列的受限回复通道。
 <1>  对 `RedisConnectionFactory` bean 的引用。
它默认为 `redisConnectionFactory`。
它与“redis-template”属性互斥。
 <1>  出站网关发送对话 `UUID` 的 Redis 列表的名称。
 <1>  当注册多个网关时,此出站网关的顺序。
 <1>  `RedisSerializer` bean 引用。
它可以是空字符串,表示“无序列化器”。
在这种情况下,入站 Redis 消息的原始 `byte[]` 作为 `Message` 有效负载发送到 `channel`。
默认情况下,它是 `JdkSerializationRedisSerializer`。
 <1>  指定此端点是否期望 Redis 队列中的数据包含整个 `Message` 实例。
如果此属性设置为 `true`,则 `serializer` 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。

Redis 队列入站网关

Spring Integration 4.1 引入了 Redis 队列入站网关来执行请求和回复场景。 它从提供的 queue 弹出对话 UUID,从 Redis 列表弹出以该 UUID 作为其键的值,并将回复推送到键为 UUID.reply 的 Redis 列表。 以下列表显示了 Redis 队列入站网关的可用属性:

<int-redis:queue-inbound-gateway
        request-channel=""  [id="CO5-1"]1
        reply-channel=""  [id="CO5-2"]2
        executor=""  [id="CO5-3"]3
        reply-timeout=""  [id="CO5-4"]4
        connection-factory=""  [id="CO5-5"]5
        queue=""  [id="CO5-6"]6
        order=""  [id="CO5-7"]7
        serializer=""  [id="CO5-8"]8
        receive-timeout=""  [id="CO5-9"]9
        expect-message=""  [id="CO5-10"]10
        recovery-interval=""/>  [id="CO5-11"]11
 <1>  此端点将从 Redis 数据创建的 `Message` 实例发送到的 `MessageChannel`。
 <1>  此端点从中等待回复 `Message` 实例的 `MessageChannel`。
可选 - `replyChannel` 标头仍在使用中。
 <1>  对 Spring `TaskExecutor`(或标准 JDK `Executor`)bean 的引用。
它用于底层侦听任务。
它默认为 `SimpleAsyncTaskExecutor`。
 <1>  等待发送回复消息的超时时间(以毫秒为单位)。
它通常适用于基于队列的受限回复通道。
 <1>  对 `RedisConnectionFactory` bean 的引用。
它默认为 `redisConnectionFactory`。
它与“redis-template”属性互斥。
 <1>  对话 `UUID` 的 Redis 列表的名称。
 <1>  当注册多个网关时,此入站网关的顺序。
 <1>  `RedisSerializer` bean 引用。
它可以是空字符串,表示“无序列化器”。
在这种情况下,入站 Redis 消息的原始 `byte[]` 作为 `Message` 有效负载发送到 `channel`。
它默认为 `JdkSerializationRedisSerializer`。
(请注意,在 4.3 版本之前,它默认是 `StringRedisSerializer`。
要恢复该行为,请提供对 `StringRedisSerializer` 的引用)。
 <1>  等待接收消息的超时时间(以毫秒为单位)。
它通常适用于基于队列的受限请求通道。
 <1>  指定此端点是否期望 Redis 队列中的数据包含整个 `Message` 实例。
如果此属性设置为 `true`,则 `serializer` 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。
 <1>  侦听器任务在“右弹出”操作发生异常后应休眠的时间(以毫秒为单位),然后重新启动侦听器任务。

task-executor 必须配置为具有多个线程进行处理;否则,当 RedisQueueMessageDrivenEndpoint 尝试在错误后重新启动侦听器任务时,可能会发生死锁。 errorChannel 可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露给可能的死锁情况。 有关可能的 TaskExecutor 实现,请参阅 Spring Framework 参考手册

Redis Stream 出站通道适配器

Spring Integration 5.4 引入了 Reactive Redis Stream 出站通道适配器,用于将消息有效负载写入 Redis Stream。 出站通道适配器使用 ReactiveStreamOperations.add(…​)Record 添加到流中。 以下示例显示了如何将 Java 配置和 Service 类用于 Redis Stream 出站通道适配器。

@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
        ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
    ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
        new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); [id="CO6-1"]1
    reactiveStreamMessageHandler.setSerializationContext(serializationContext); [id="CO6-2"]2
    reactiveStreamMessageHandler.setHashMapper(hashMapper); [id="CO6-3"]3
    reactiveStreamMessageHandler.setExtractPayload(true); [id="CO6-4"]4
    return reactiveStreamMessageHandler;
}
 <1>  使用 `ReactiveRedisConnectionFactory` 和流名称构造 `ReactiveRedisStreamMessageHandler` 实例以添加记录。
另一个构造函数变体基于 SpEL 表达式,用于根据请求消息评估流键。
 <1>  设置用于在添加到流之前序列化记录键和值的 `RedisSerializationContext`。
 <1>  设置 `HashMapper`,它提供 Java 类型和 Redis 哈希/映射之间的契约。
 <1>  如果为 `true`,通道适配器将从请求消息中提取有效负载以添加流记录。
或者使用整个消息作为值。
它默认为 `true`。

从 6.5 版本开始,ReactiveRedisStreamMessageHandler 提供了一个 setAddOptionsFunction(Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction),用于基于请求消息构建内部 ReactiveStreamOperations.add(Record<K, ?> record, XAddOptions xAddOptions) 调用的 RedisStreamCommands.XAddOptions

Redis Stream 入站通道适配器

Spring Integration 5.4 引入了 Reactive Stream 入站通道适配器,用于从 Redis Stream 读取消息。 入站通道适配器根据自动确认标志使用 StreamReceiver.receive(…​)StreamReceiver.receiveAutoAck() 从 Redis Stream 读取记录。 以下示例显示了如何将 Java 配置用于 Redis Stream 入站通道适配器。

@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
       ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
            new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); [id="CO7-1"]1
    messageProducer.setStreamReceiverOptions( [id="CO7-2"]2
                StreamReceiver.StreamReceiverOptions.builder()
                      .pollTimeout(Duration.ofMillis(100))
                      .build());
    messageProducer.setAutoStartup(true); [id="CO7-3"]3
    messageProducer.setAutoAck(false); [id="CO7-4"]4
    messageProducer.setCreateConsumerGroup(true); [id="CO7-5"]5
    messageProducer.setConsumerGroup("my-group"); [id="CO7-6"]6
    messageProducer.setConsumerName("my-consumer"); [id="CO7-7"]7
    messageProducer.setOutputChannel(fromRedisStreamChannel); [id="CO7-8"]8
    messageProducer.setReadOffset(ReadOffset.latest()); [id="CO7-9"]9
    messageProducer.extractPayload(true); [id="CO7-10"]10
    return messageProducer;
}
 <1>  使用 `ReactiveRedisConnectionFactory` 和流键构造 `ReactiveRedisStreamMessageProducer` 实例以读取记录。
 <1>  一个 `StreamReceiver.StreamReceiverOptions`,用于使用反应式基础设施消费 redis 流。
 <1>  一个 `SmartLifecycle` 属性,用于指定此端点是否应在应用程序上下文启动后自动启动。
它默认为 `true`。
如果为 `false`,则应手动启动 `RedisStreamMessageProducer` (`messageProducer.start()`)。
 <1>  如果为 `false`,则收到的消息不会自动确认。
消息的确认将推迟到客户端消费消息。
它默认为 `true`。
 <1>  如果为 `true`,则将创建一个消费者组。
在创建消费者组期间,也将创建流(如果尚不存在)。
消费者组跟踪消息传递并区分消费者。
它默认为 `false`。
 <1>  设置消费者组名称。
它默认为定义的 bean 名称。
 <1>  设置消费者名称。
从组 `my-group` 读取消息作为 `my-consumer`。
 <1>  此端点将消息发送到的消息通道。
 <1>  定义读取消息的偏移量。
它默认为 `ReadOffset.latest()`。
 <1>  如果为 `true`,通道适配器将从 `Record` 中提取有效负载值。
否则,整个 `Record` 用作有效负载。
它默认为 `true`。

如果 autoAck 设置为 false,则 Redis Stream 中的 Record 不会被 Redis 驱动程序自动确认,而是将 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 标头添加到要生成的消息中,其值为 SimpleAcknowledgment 实例。 当业务逻辑完成基于此类记录的消息时,目标集成流负责调用其 acknowledge() 回调。 即使在反序列化期间发生异常并配置了 errorChannel,也需要类似的逻辑。 因此,目标错误处理程序必须决定是否确认或拒绝此类失败消息。 除了 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 之外,ReactiveRedisStreamMessageProducer 还会将这些标头填充到要生成的消息中:RedisHeaders.STREAM_KEYRedisHeaders.STREAM_MESSAGE_IDRedisHeaders.CONSUMER_GROUPRedisHeaders.CONSUMER

从 5.5 版本开始,你可以在 ReactiveRedisStreamMessageProducer 上显式配置 StreamReceiver.StreamReceiverOptionsBuilder 选项,包括新引入的 onErrorResume 函数,如果 Redis Stream 消费者在发生反序列化错误时应继续轮询,则需要此函数。 默认函数将消息发送到错误通道(如果提供),并可能对失败消息进行确认,如上所述。 所有这些 StreamReceiver.StreamReceiverOptionsBuilder 都与外部提供的 StreamReceiver.StreamReceiverOptions 互斥。

Redis 锁注册表

Spring Integration 4.0 引入了 RedisLockRegistry。 某些组件(例如,聚合器和重新排序器)使用从 LockRegistry 实例获取的锁来确保一次只有一个线程操作一个组。 DefaultLockRegistry 在单个组件中执行此功能。 你现在可以在这些组件上配置外部锁注册表。 当与共享 MessageGroupStore 一起使用时,你可以使用 RedisLockRegistry 在多个应用程序实例之间提供此功能,这样一次只有一个实例可以操作该组。

当本地线程释放锁时,另一个本地线程通常可以立即获取锁。 如果锁由使用不同注册表实例的线程释放,则获取锁可能需要长达 100 毫秒。

为了避免“挂起”的锁(当服务器发生故障时),此注册表中的锁在默认的 60 秒后过期,但你可以在注册表上配置此值。 锁通常持有时间要短得多。

由于键可能会过期,尝试解锁过期锁会导致抛出异常。 但是,受此类锁保护的资源可能已受到损害,因此此类异常应被视为严重异常。 你应该将过期时间设置得足够大,以防止这种情况发生,但要设置得足够低,以便在服务器故障后能在合理的时间内恢复锁。

从 5.0 版本开始,RedisLockRegistry 实现了 ExpirableLockRegistry,它会删除最后获取时间超过 age 且当前未锁定的锁。

从 5.5.6 版本开始,RedisLockRegistry 支持通过 RedisLockRegistry.setCacheCapacity() 自动清理 RedisLockRegistry.locks 中的 redisLocks 缓存。 有关更多信息,请参阅其 JavaDocs。

从 5.5.13 版本开始,RedisLockRegistry 公开了一个 setRedisLockType(RedisLockType) 选项,用于确定 Redis 锁获取应以何种模式发生:

  • RedisLockType.SPIN_LOCK - 通过定期循环(100 毫秒)检查是否可以获取锁来获取锁。 默认。

  • RedisLockType.PUB_SUB_LOCK - 通过 redis 发布-订阅订阅获取锁。

发布-订阅是首选模式——客户端 Redis 服务器之间的网络通信更少,性能更高——当订阅在另一个进程中收到解锁通知时,锁会立即获取。 但是,Redis 不支持主/副本连接中的发布-订阅(例如在 AWS ElastiCache 环境中),因此选择忙等待模式作为默认值,以使注册表在任何环境中都能工作。

从 6.4 版本开始,RedisLockRegistry.RedisLock.unlock() 方法不再抛出 IllegalStateException,而是抛出 ConcurrentModificationException,如果锁的所有权已过期。

从 6.4 版本开始,添加了 RedisLockRegistry.setRenewalTaskScheduler() 来配置用于定期续订锁的调度程序。 设置后,锁将在成功获取锁后每 1/3 的过期时间自动续订,直到解锁或 redis 键被删除。