Redis Streams
Redis Streams 以抽象方式对日志数据结构建模。通常,日志是仅附加数据结构,从头开始随机位置,或者通过流式传输新消息来消费的。
Redis Streams model a log data structure in an abstract approach. Typically, logs are append-only data structures and are consumed from the beginning on, at a random position, or by streaming new messages.
请访问“ Redis reference documentation”详细了解 Redis Streams。 |
Learn more about Redis Streams in the Redis reference documentation. |
Redis Streams 粗略可以分为两个功能区域:
Redis Streams can be roughly divided into two areas of functionality:
-
Appending records
-
Consuming records
尽管此模式与 Pub/Sub 相似,但主要区别在于消息的持久性和它们被使用的方式。
Although this pattern has similarities to Pub/Sub, the main difference lies in the persistence of messages and how they are consumed.
Pub/Sub 依赖于瞬态消息的广播(即,如果您不倾听,则会错过一条消息),而 Redis Stream 使用持久且仅追加的数据类型,此数据类型保留消息直至修剪流。消费的另一个区别在于,Pub/Sub 注册服务端订阅。Redis 将到达的消息推送到客户端,而 Redis Streams 需要主动轮询。
While Pub/Sub relies on the broadcasting of transient messages (i.e. if you don’t listen, you miss a message), Redis Stream use a persistent, append-only data type that retains messages until the stream is trimmed. Another difference in consumption is that Pub/Sub registers a server-side subscription. Redis pushes arriving messages to the client while Redis Streams require active polling.
org.springframework.data.redis.connection
和 org.springframework.data.redis.stream
包为 Redis Streams 提供核心功能。
The org.springframework.data.redis.connection
and org.springframework.data.redis.stream
packages provide the core functionality for Redis Streams.
Appending
要发送记录,您可以与其他操作一样,使用底层的 RedisConnection
或高级的 StreamOperations
。这两个实体都提供接受记录和目标流作为参数的 add
(xAdd
) 方法。RedisConnection
需要原始数据(字节数组),而 StreamOperations
允许将任意对象作为记录传入,如下例所示:
To send a record, you can use, as with the other operations, either the low-level RedisConnection
or the high-level StreamOperations
. Both entities offer the add
(xAdd
) method, which accepts the record and the destination stream as arguments. While RedisConnection
requires raw data (array of bytes), the StreamOperations
lets arbitrary objects be passed in as records, as shown in the following example:
// append message through connection
RedisConnection con = …
byte[] stream = …
ByteRecord record = StreamRecords.rawBytes(…).withStreamKey(stream);
con.xAdd(record);
// append message through RedisTemplate
RedisTemplate template = …
StringRecord record = StreamRecords.string(…).withStreamKey("my-stream");
template.opsForStream().add(record);
流记录作为其有效负载携带 Map
(键值元组)。将记录追加到流中会返回可作为进一步引用的 RecordId
。
Stream records carry a Map
, key-value tuples, as their payload. Appending a record to a stream returns the RecordId
that can be used as further reference.
Consuming
在消费端,可以消费一个或多个流。Redis Streams 提供读取命令,允许从已知流内容中的任意位置(随机访问)和流结尾之外的流中消费。
On the consuming side, one can consume one or multiple streams. Redis Streams provide read commands that allow consumption of the stream from an arbitrary position (random access) within the known stream content and beyond the stream end to consume new stream record.
RedisConnection
在低层提供了 xRead
和 xReadGroup
方法,分别映射用于在消费者组中读取和读取的 Redis 命令。请注意,多个流可以用作参数。
At the low-level, RedisConnection
offers the xRead
and xReadGroup
methods that map the Redis commands for reading and reading within a consumer group, respectively. Note that multiple streams can be used as arguments.
Redis 中的订阅命令可以是阻塞的。也就是说,在连接上调用“ |
Subscription commands in Redis can be blocking. That is, calling |
要消耗流消息,可以轮询应用程序代码中的消息,也可以使用两个 Asynchronous reception through Message Listener Containers 中的一个(命令式或响应式)中的一个。每次有新记录到达时,容器会通知应用程序代码。
To consume stream messages, one can either poll for messages in application code, or use one of the two Asynchronous reception through Message Listener Containers, the imperative or the reactive one. Each time a new records arrives, the container notifies the application code.
Synchronous reception
虽然流消费通常与异步处理相关,但可以同步消费消息。重载的 StreamOperations.read(…)
方法提供此功能。在同步接收期间,调用线程可能会在消息可用时一直处于阻塞状态。StreamReadOptions.block
属性指定接收者放弃等待消息前应该等待多长时间。
While stream consumption is typically associated with asynchronous processing, it is possible to consume messages synchronously. The overloaded StreamOperations.read(…)
methods provide this functionality. During a synchronous receive, the calling thread potentially blocks until a message becomes available. The property StreamReadOptions.block
specifies how long the receiver should wait before giving up waiting for a message.
// Read message through RedisTemplate
RedisTemplate template = …
List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(StreamReadOptions.empty().count(2),
StreamOffset.latest("my-stream"));
List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(Consumer.from("my-group", "my-consumer"),
StreamReadOptions.empty().count(2),
StreamOffset.create("my-stream", ReadOffset.lastConsumed()))
Asynchronous reception through Message Listener Containers
由于其阻塞特性,低级别轮询不可取,因为它需要为每一个消费者都进行连接和线程管理。为了缓解此问题,Spring Data 提供了消息侦听器来完成所有繁重的工作。如果您熟悉 EJB 和 JMS,应该会发现这些概念很熟悉,因为它旨在尽可能接近 Spring Framework 中的支持及其消息驱动的 POJO (MDP) 。
Due to its blocking nature, low-level polling is not attractive, as it requires connection and thread management for every single consumer. To alleviate this problem, Spring Data offers message listeners, which do all the heavy lifting. If you are familiar with EJB and JMS, you should find the concepts familiar, as it is designed to be as close as possible to the support in Spring Framework and its message-driven POJOs (MDPs).
Spring Data 提供了针对所使用的编程模型定制的两个实现:
Spring Data ships with two implementations tailored to the used programming model:
-
StreamMessageListenerContainer
acts as message listener container for imperative programming models. It is used to consume records from a Redis Stream and drive theStreamListener
instances that are injected into it. -
StreamReceiver
provides a reactive variant of a message listener. It is used to consume messages from a Redis Stream as potentially infinite stream and emit stream messages through aFlux
.
StreamMessageListenerContainer
和 StreamReceiver
负责消息接收的所有线程处理和分派到侦听器进行处理。消息侦听器容器/接收器是 MDP 和消息提供者之间的中间代理,它负责注册以接收消息、资源获取和释放、异常转换等。这让开发者可以编写与接收消息(并做出反应)相关联的(可能很复杂的)业务逻辑,并将样板 Redis 基础架构问题委派给框架。
StreamMessageListenerContainer
and StreamReceiver
are responsible for all threading of message reception and dispatch into the listener for processing. A message listener container/receiver is the intermediary between an MDP and a messaging provider and takes care of registering to receive messages, resource acquisition and release, exception conversion, and the like. This lets you as an application developer write the (possibly complex) business logic associated with receiving a message (and reacting to it) and delegates boilerplate Redis infrastructure concerns to the framework.
这两个容器都允许运行时配置更改,这样您可以在应用程序运行时添加或删除订阅,而无需重启。此外,容器使用延迟订阅方法,仅在需要时才使用 RedisConnection
。如果所有侦听器都取消订阅,它会自动执行清理,并释放线程。
Both containers allow runtime configuration changes so that you can add or remove subscriptions while an application is running without the need for a restart. Additionally, the container uses a lazy subscription approach, using a RedisConnection
only when needed. If all the listeners are unsubscribed, it automatically performs a cleanup, and the thread is released.
Imperative StreamMessageListenerContainer
以类似于 EJB 世界中的消息驱动 Bean (MDB) 的方式,流驱动 POJO (SDP) 充当 Stream 消息的接收器。SDP 的一个限制是它必须实现 org.springframework.data.redis.stream.StreamListener
接口。还需要注意的是,在您的 POJO 在多个线程上接收消息的情况下,重要的是确保您的实现是线程安全的。
In a fashion similar to a Message-Driven Bean (MDB) in the EJB world, the Stream-Driven POJO (SDP) acts as a receiver for Stream messages. The one restriction on an SDP is that it must implement the org.springframework.data.redis.stream.StreamListener
interface. Please also be aware that in the case where your POJO receives messages on multiple threads, it is important to ensure that your implementation is thread-safe.
class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
}
}
StreamListener
表示一个函数式接口,因此可以使用其 Lambda 形式重写实现:
StreamListener
represents a functional interface so implementations can be rewritten using their Lambda form:
message -> {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
};
一旦实现了 StreamListener
,便可以创建一个消息侦听器容器并注册一个订阅:
Once you’ve implemented your StreamListener
, it’s time to create a message listener container and register a subscription:
RedisConnectionFactory connectionFactory = …
StreamListener<String, MapRecord<String, String, String>> streamListener = …
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
containerOptions);
Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), streamListener);
请参考各种消息侦听器容器的 Javadoc,以获取每个实现支持的全部功能的完整说明。
Please refer to the Javadoc of the various message listener containers for a full description of the features supported by each implementation.
Reactive StreamReceiver
流数据源的反应性消费通常通过事件或消息的“Flux”发生。通过“StreamReceiver”及其重载的“receive(…)”消息提供反应性接收器实现。与“StreamMessageListenerContainer”相比,反应性方法需要较少的线程等基础设施资源,因为它利用驱动程序提供的线程资源。接收流是“StreamMessage”的需求驱动发布者:
Reactive consumption of streaming data sources typically happens through a Flux
of events or messages. The reactive receiver implementation is provided with StreamReceiver
and its overloaded receive(…)
messages. The reactive approach requires fewer infrastructure resources such as threads in comparison to StreamMessageListenerContainer
as it is leveraging threading resources provided by the driver. The receiving stream is a demand-driven publisher of StreamMessage
:
Flux<MapRecord<String, String, String>> messages = …
return messages.doOnNext(it -> {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
});
现在,我们需要创建“StreamReceiver”并注册一个订阅以使用流消息:
Now we need to create the StreamReceiver
and register a subscription to consume stream messages:
ReactiveRedisConnectionFactory connectionFactory = …
StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder().pollTimeout(Duration.ofMillis(100))
.build();
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory, options);
Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("my-stream"));
请参考各种消息侦听器容器的 Javadoc,以获取每个实现支持的全部功能的完整说明。
Please refer to the Javadoc of the various message listener containers for a full description of the features supported by each implementation.
受需求驱动的消耗使用反压信号激活和停用轮询。 |
Demand-driven consumption uses backpressure signals to activate and deactivate polling. |
Acknowledge
strategies
当您通过“Consumer Group”读取消息时,服务器会记住已传递给定消息并将其添加到待处理条目列表(PEL)中。已传递但尚未确认的消息列表。必须通过“StreamOperations.acknowledge”确认消息,以从待处理的条目列表中删除它,如下面的代码段所示。
When you read with messages via a Consumer Group
, the server will remember that a given message was delivered and add it to the Pending Entries List (PEL). A list of messages delivered but not yet acknowledged.
Messages have to be acknowledged via StreamOperations.acknowledge
in order to be removed from the Pending Entries List as shown in the snippet below.
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = ...
container.receive(Consumer.from("my-group", "my-consumer"), 1
StreamOffset.create("my-stream", ReadOffset.lastConsumed()),
msg -> {
// ...
redisTemplate.opsForStream().acknowledge("my-group", msg); 2
});
1 | Read as my-consumer from group my-group. Received messages are not acknowledged. |
2 | Acknowledged the message after processing. |
如需在接收时自动确认消息,使用 |
To auto acknowledge messages on receive use |
ReadOffset
strategies
流读取操作接受读取偏移量规范,以从此偏移量开始使用消息。“ReadOffset”表示读取偏移量规范。Redis 根据您是以独立方式使用流还是在消费者组中使用流,支持 3 种类型的偏移量:
Stream read operations accept a read offset specification to consume messages from the given offset on. ReadOffset
represents the read offset specification. Redis supports 3 variants of offsets, depending on whether you consume the stream standalone or within a consumer group:
-
ReadOffset.latest()
– Read the latest message. -
ReadOffset.from(…)
– Read after a specific message Id. -
ReadOffset.lastConsumed()
– Read after the last consumed message Id (consumer-group only).
在基于容器的消息消费的上下文中,我们需在使用消息时提升(或递增)读取偏移量。提升取决于请求的“ReadOffset”和消费模式(带有或不带有消费者组)。下表解释了容器如何提升“ReadOffset”:
In the context of a message container-based consumption, we need to advance (or increment) the read offset when consuming a message. Advancing depends on the requested ReadOffset
and consumption mode (with/without consumer groups). The following matrix explains how containers advance ReadOffset
:
Read offset | Standalone | Consumer Group |
---|---|---|
Latest |
Read latest message |
Read latest message |
Specific Message Id |
Use last seen message as the next MessageId |
Use last seen message as the next MessageId |
Last Consumed |
Use last seen message as the next MessageId |
Last consumed message as per consumer group |
从特定消息 ID 和最后使用消息中读取内容可被认为是安全的操作,用于确保使用添加到流中的所有消息。对读取使用最新消息可以跳过在轮询操作处于死时间的状态时添加到流中的消息。轮询会引入死时间,其中消息可以在各个轮询命令之间到达。流使用不是线性连续读取,而是拆分为重复的“XREAD”调用。
Reading from a specific message id and the last consumed message can be considered safe operations that ensure consumption of all messages that were appended to the stream.
Using the latest message for read can skip messages that were added to the stream while the poll operation was in the state of dead time. Polling introduces a dead time in which messages can arrive between individual polling commands. Stream consumption is not a linear contiguous read but split into repeating XREAD
calls.
Serialization
发送到流中的任何记录都需要序列化为其二进制格式。由于流非常接近哈希数据结构,因此流密钥、字段名称和值使用配置在“RedisTemplate”上的相应序列化程序。
Any Record sent to the stream needs to be serialized to its binary format. Due to the streams closeness to the hash data structure the stream key, field names and values use the according serializers configured on the RedisTemplate
.
Stream Property | Serializer | Description |
---|---|---|
key |
keySerializer |
used for |
field |
hashKeySerializer |
used for each map key in the payload |
value |
hashValueSerializer |
used for each map value in the payload |
请确保检查正在使用的“RedisSerializer”,并注意,如果您决定不使用任何序列化程序,则您需要确保这些值已是二进制格式的。
Please make sure to review `RedisSerializer`s in use and note that if you decide to not use any serializer you need to make sure those values are binary already.
Object Mapping
Simple Values
“StreamOperations”允许直接将简单值通过“ObjectRecord”附加到流中,而不必将这些值放入“Map”结构中。该值将随后分配给“payload”字段,并且可以在读回该值时提取它。
StreamOperations
allows to append simple values, via ObjectRecord
, directly to the stream without having to put those values into a Map
structure.
The value will then be assigned to an payload field and can be extracted when reading back the value.
ObjectRecord<String, String> record = StreamRecords.newRecord()
.in("my-stream")
.ofObject("my-value");
redisTemplate()
.opsForStream()
.add(record); 1
List<ObjectRecord<String, String>> records = redisTemplate()
.opsForStream()
.read(String.class, StreamOffset.fromStart("my-stream"));
1 | XADD my-stream * "_class" "java.lang.String" "_raw" "my-value" |
“ObjectRecord”通过与所有其他记录完全相同的序列化过程,因此还可以使用返回“MapRecord”的未类型化读取操作来获取记录。
ObjectRecord`s pass through the very same serialization process as the all other records, thus the Record can also obtained using the untyped read operation returning a `MapRecord
.
Complex Values
可以通过 3 种方式将复杂值添加到流中:
Adding a complex value to the stream can be done in 3 ways:
-
Convert to simple value using e. g. a String JSON representation.
-
Serialize the value with a suitable
RedisSerializer
. -
Convert the value into a
Map
suitable for serialization using aHashMapper
.
第一个方法是最直接的一个,但是忽略了流结构提供的字段值功能,但是流中的值对于其他消费者来说仍然可读。第二个选项具有与第一个选项相同的好处,但是由于所有消费者必须实现完全相同的序列化机制,因此可能导致非常特殊的消费者限制。“HashMapper”方法稍微复杂一些,使用了流哈希结构,但是扁平化了源。如果选择合适的序列化程序组合,则其他消费者仍然能够读取记录。
The first variant is the most straight forward one but neglects the field value capabilities offered by the stream structure, still the values in the stream will be readable for other consumers.
The 2nd option holds the same benefits as the first one, but may lead to a very specific consumer limitations as the all consumers must implement the very same serialization mechanism.
The HashMapper
approach is the a bit more complex one making use of the steams hash structure, but flattening the source. Still other consumers remain able to read the records as long as suitable serializer combinations are chosen.
“ HashMappers”使用特定类型将有效负载转换为“ |
HashMappers convert the payload to a |
ObjectRecord<String, User> record = StreamRecords.newRecord()
.in("user-logon")
.ofObject(new User("night", "angel"));
redisTemplate()
.opsForStream()
.add(record); 1
List<ObjectRecord<String, User>> records = redisTemplate()
.opsForStream()
.read(User.class, StreamOffset.fromStart("user-logon"));
1 | XADD user-logon * "_class" "com.example.User" "firstname" "night" "lastname" "angel" |
StreamOperations
默认情况下使用 ObjectHashMapper。获取 StreamOperations
时,您可以提供适合您要求的 HashMapper
。
StreamOperations
use by default ObjectHashMapper.
You may provide a HashMapper
suitable for your requirements when obtaining StreamOperations
.
redisTemplate()
.opsForStream(new Jackson2HashMapper(true))
.add(record); 1
1 | XADD user-logon * "firstname" "night" "@class" "com.example.User" "lastname" "angel" |
一个“StreamMessageListenerContainer”可能不知道域类型上使用的任何“@TypeAlias”,因为那些需要通过“MappingContext”解析。确保使用“initialEntitySet”初始化“RedisMappingContext”。 A
|