Data Buffers and Codecs

Java NIO 提供 ByteBuffer,但许多库都基于字节缓冲区的 API 构建了自己的 API,尤其是在重用缓冲区和/或使用直接缓冲区对性能有益的网络操作中。例如 Netty 具有 ByteBuf 层次结构,Undertow 使用 XNIO,Jetty 使用带有回调的池化字节缓冲区来释放,依此类推。spring-core 模块提供了一组抽象,用于处理各种字节缓冲区 API,如下所示:

Java NIO provides ByteBuffer but many libraries build their own byte buffer API on top, especially for network operations where reusing buffers and/or using direct buffers is beneficial for performance. For example Netty has the ByteBuf hierarchy, Undertow uses XNIO, Jetty uses pooled byte buffers with a callback to be released, and so on. The spring-core module provides a set of abstractions to work with various byte buffer APIs as follows:

DataBufferFactory

DataBufferFactory 用于以以下两种方式之一创建数据缓冲区:

DataBufferFactory is used to create data buffers in one of two ways:

  1. 分配一个新的数据缓冲区,如果已知,可以选择预先指定容量,这样更高效,即使 DataBuffer 的实现可以按需增长和缩小。

  2. Allocate a new data buffer, optionally specifying capacity upfront, if known, which is more efficient even though implementations of DataBuffer can grow and shrink on demand.

  3. 包装现有的 byte[]java.nio.ByteBuffer,该包装会使用 DataBuffer 实现修饰给定数据,且不涉及分配。

  4. Wrap an existing byte[] or java.nio.ByteBuffer, which decorates the given data with a DataBuffer implementation and that does not involve allocation.

请注意,WebFlux 应用程序不会直接创建 DataBufferFactory,而是在客户端通过 ServerHttpResponseClientHttpRequest 访问它。工厂的类型取决于底层的客户端或服务器,例如 Reactor Netty 的 NettyDataBufferFactory,其他类型的 DefaultDataBufferFactory

Note that WebFlux applications do not create a DataBufferFactory directly but instead access it through the ServerHttpResponse or the ClientHttpRequest on the client side. The type of factory depends on the underlying client or server, e.g. NettyDataBufferFactory for Reactor Netty, DefaultDataBufferFactory for others.

DataBuffer

DataBuffer 接口提供了与 java.nio.ByteBuffer 类似的操作,但也带来了一些额外的好处,其中一些好处受到 Netty ByteBuf 的启发。以下是一部分好处:

The DataBuffer interface offers similar operations as java.nio.ByteBuffer but also brings a few additional benefits some of which are inspired by the Netty ByteBuf. Below is a partial list of benefits:

  • 使用独立位置读取和写入,即不需要调用 flip() 来在读取和写入之间交替进行。

  • Read and write with independent positions, i.e. not requiring a call to flip() to alternate between read and write.

  • 使用 java.lang.StringBuilder 按需扩容。

  • Capacity expanded on demand as with java.lang.StringBuilder.

  • 通过 PooledDataBuffer 使用池化缓冲区和引用计数。

  • Pooled buffers and reference counting via PooledDataBuffer.

  • 将缓冲区视为 java.nio.ByteBufferInputStreamOutputStream

  • View a buffer as java.nio.ByteBuffer, InputStream, or OutputStream.

  • 确定给定字节的索引或最后一个索引。

  • Determine the index, or the last index, for a given byte.

PooledDataBuffer

ByteBuffer 的 Javadoc 中所述,字节缓冲区可以是直接的或非直接的。直接缓冲区可能会驻留在 Java 堆之外,这消除了对于本地 IO 操作复制的需要。这使得直接缓冲区对于通过套接字接收和发送数据特别有用,但它们创建和释放起来也更昂贵,这会导致池缓冲区的想法。

As explained in the Javadoc for ByteBuffer, byte buffers can be direct or non-direct. Direct buffers may reside outside the Java heap which eliminates the need for copying for native I/O operations. That makes direct buffers particularly useful for receiving and sending data over a socket, but they’re also more expensive to create and release, which leads to the idea of pooling buffers.

PooledDataBufferDataBuffer 的扩展,它有助于引用计数,这对于字节缓冲区池化至关重要。它如何工作?当分配 PooledDataBuffer 时,引用计数为 1。对 retain() 的调用会增加计数,而对 release() 的调用会减少计数。只要计数大于 0,则保证不会释放缓冲区。当计数减少到 0 时,可以释放池化缓冲区,实际上这意味着为缓冲区保留的内存被返回到内存池。

PooledDataBuffer is an extension of DataBuffer that helps with reference counting which is essential for byte buffer pooling. How does it work? When a PooledDataBuffer is allocated the reference count is at 1. Calls to retain() increment the count, while calls to release() decrement it. As long as the count is above 0, the buffer is guaranteed not to be released. When the count is decreased to 0, the pooled buffer can be released, which in practice could mean the reserved memory for the buffer is returned to the memory pool.

请注意,大部分情况下,最好直接使用 PooledDataBuffer 上的便捷方法,而不是使用 DataBufferUtils 中的便捷方法,只有当它是一个 PooledDataBuffer 实例时才能将释放或保留应用到 DataBuffer 中。

Note that instead of operating on PooledDataBuffer directly, in most cases it’s better to use the convenience methods in DataBufferUtils that apply release or retain to a DataBuffer only if it is an instance of PooledDataBuffer.

DataBufferUtils

DataBufferUtils 提供了许多用于操作数据缓冲区的实用方法:

DataBufferUtils offers a number of utility methods to operate on data buffers:

  • 将数据缓冲区流连接到一个缓冲区中,可能支持零拷贝,例如,通过复合缓冲区(如果底层字节缓冲区 API 支持的话)。

  • Join a stream of data buffers into a single buffer possibly with zero copy, e.g. via composite buffers, if that’s supported by the underlying byte buffer API.

  • InputStream 或 NIO Channel 转换为 Flux<DataBuffer>,反之亦然,将 Publisher<DataBuffer> 转换为 OutputStream 或 NIO Channel

  • Turn InputStream or NIO Channel into Flux<DataBuffer>, and vice versa a Publisher<DataBuffer> into OutputStream or NIO Channel.

  • 如果缓冲区是 PooledDataBuffer 的一个实例,释放或保留 DataBuffer 的方法。

  • Methods to release or retain a DataBuffer if the buffer is an instance of PooledDataBuffer.

  • 从字节流中断过或跳过,直到达到特定的字节计数。

  • Skip or take from a stream of bytes until a specific byte count.

Codecs

org.springframework.core.codec 软件包提供以下策略接口:

The org.springframework.core.codec package provides the following strategy interfaces:

  • EncoderPublisher&lt;T&gt; 编码为数据缓冲区流。

  • Encoder to encode Publisher<T> into a stream of data buffers.

  • DecoderPublisher&lt;DataBuffer&gt; 解码为更高层次对象流。

  • Decoder to decode Publisher<DataBuffer> into a stream of higher level objects.

spring-core 模块提供以下各项: byte[]ByteBufferDataBufferResourceString 编码器和解码器实现。spring-web 模块添加了 Jackson JSON、Jackson Smile、JAXB2、Protobuf 和其他编码器和解码器。请参阅 WebFlux 部分中的 Codecs

The spring-core module provides byte[], ByteBuffer, DataBuffer, Resource, and String encoder and decoder implementations. The spring-web module adds Jackson JSON, Jackson Smile, JAXB2, Protocol Buffers and other encoders and decoders. See Codecs in the WebFlux section.

Using DataBuffer

在使用数据缓冲区时,必须特别注意确保释放缓冲区,因为它们可能已 pooled。我们将使用编解码器来说明其工作原理,但这些概念的适用范围更广。让我们看看编解码器在内部必须执行哪些操作来管理数据缓冲区。

When working with data buffers, special care must be taken to ensure buffers are released since they may be pooled. We’ll use codecs to illustrate how that works but the concepts apply more generally. Let’s see what codecs must do internally to manage data buffers.

在创建更高级别的对象之前,Decoder 是读取输入数据缓冲区的最后一步,因此它必须按照如下方式释放它们:

A Decoder is the last to read input data buffers, before creating higher level objects, and therefore it must release them as follows:

  1. 如果 Decoder 只需读取每个输入缓冲区,并准备立即释放,则可以通过 DataBufferUtils.release(dataBuffer) 来执行该操作。

  2. If a Decoder simply reads each input buffer and is ready to release it immediately, it can do so via DataBufferUtils.release(dataBuffer).

  3. 如果 Decoder 正在使用 FluxMono 算子,例如 flatMapreduce 以及内部预取和缓存数据项的其他算子,或者正在使用 filterskip 等算子,这些算子会将项留出,则必须将 doOnDiscard(DataBuffer.class, DataBufferUtils::release) 添加到 composition 链中,以确保在丢弃之前释放此类缓冲区,也有可能作为错误或取消信号的结果。

  4. If a Decoder is using Flux or Mono operators such as flatMap, reduce, and others that prefetch and cache data items internally, or is using operators such as filter, skip, and others that leave out items, then doOnDiscard(DataBuffer.class, DataBufferUtils::release) must be added to the composition chain to ensure such buffers are released prior to being discarded, possibly also as a result of an error or cancellation signal.

  5. 如果 Decoder 以任何其他方式保存一个或多个数据缓冲区,则它必须确保在完全读取时释放它们,或者在缓存数据缓冲区已被读取和释放之前发生的错误或取消信号的情况下释放它们。

  6. If a Decoder holds on to one or more data buffers in any other way, it must ensure they are released when fully read, or in case of an error or cancellation signals that take place before the cached data buffers have been read and released.

请注意,DataBufferUtils#join 提供了一种安全有效的方式,可以将数据缓冲区流聚合到单个数据缓冲区中。同样,skipUntilByteCounttakeUntilByteCount 是解码器可以使用的其他安全方法。

Note that DataBufferUtils#join offers a safe and efficient way to aggregate a data buffer stream into a single data buffer. Likewise skipUntilByteCount and takeUntilByteCount are additional safe methods for decoders to use.

Encoder 分配其他必须读取(并释放)的数据缓冲区。因此,Encoder 没有太多事情要做。但是,如果在使用数据填充缓冲区时发生序列化错误,则 Encoder 必须小心释放数据缓冲区。例如:

An Encoder allocates data buffers that others must read (and release). So an Encoder doesn’t have much to do. However an Encoder must take care to release a data buffer if a serialization error occurs while populating the buffer with data. For example:

  • Java

  • Kotlin

DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
	// serialize and populate buffer..
	release = false;
}
finally {
	if (release) {
		DataBufferUtils.release(buffer);
	}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
	// serialize and populate buffer..
	release = false
} finally {
	if (release) {
		DataBufferUtils.release(buffer)
	}
}
return buffer

Encoder 的使用者负责释放它接收的数据缓冲区。在 WebFlux 应用程序中,Encoder 的输出用于写入 HTTP 服务器响应或客户端 HTTP 请求,在这种情况下,释放数据缓冲区的责任就是编写服务器响应或客户端请求的代码。

The consumer of an Encoder is responsible for releasing the data buffers it receives. In a WebFlux application, the output of the Encoder is used to write to the HTTP server response, or to the client HTTP request, in which case releasing the data buffers is the responsibility of the code writing to the server response, or to the client request.

请注意,当在 Netty 上运行时,有用于 troubleshooting buffer leaks 的调试选项。

Note that when running on Netty, there are debugging options for troubleshooting buffer leaks.