数据缓冲区和编解码器
Java NIO 提供了 ByteBuffer,但许多库在此基础上构建了自己的字节缓冲区 API,
特别是对于网络操作,其中重用缓冲区和/或使用直接缓冲区对性能有利。
例如,Netty 有 ByteBuf 层次结构,Jetty 使用带有回调机制的池化字节缓冲区来释放,等等。
spring-core 模块提供了一组抽象来处理各种字节缓冲区 API,具体如下:
-
DataBufferFactory抽象了数据缓冲区的创建。 -
DataBuffer表示一个字节缓冲区,它可能是 池化的。 -
DataBufferUtils提供了数据缓冲区的实用方法。 -
编解码器 将数据缓冲区流解码或编码为更高级别的对象。
DataBufferFactory
DataBufferFactory 用于通过以下两种方式之一创建数据缓冲区:
-
分配一个新的数据缓冲区,如果已知容量,可以提前指定容量,这更高效,尽管
DataBuffer的实现可以按需增长和收缩。 -
包装一个现有的
byte[]或java.nio.ByteBuffer,它用DataBuffer实现装饰给定的数据,并且不涉及分配。
请注意,WebFlux 应用程序不直接创建 DataBufferFactory,而是通过 ServerHttpResponse 或客户端的 ClientHttpRequest 访问它。
工厂的类型取决于底层的客户端或服务器,例如,Reactor Netty 使用 NettyDataBufferFactory,其他则使用 DefaultDataBufferFactory。
DataBuffer
DataBuffer 接口提供了与 java.nio.ByteBuffer 类似的操作,但也带来了一些额外的优势,其中一些灵感来自 Netty ByteBuf。
以下是部分优势列表:
-
读写位置独立,即不需要调用
flip()来在读写之间切换。 -
容量按需扩展,如
java.lang.StringBuilder。 -
通过
PooledDataBuffer实现池化缓冲区和引用计数。 -
将缓冲区视为
java.nio.ByteBuffer、InputStream或OutputStream。 -
确定给定字节的索引或最后一个索引。
PooledDataBuffer
如 ByteBuffer 的 Javadoc 中所解释, 字节缓冲区可以是直接的或非直接的。直接缓冲区可能位于 Java 堆之外,这消除了本机 I/O 操作的复制需求。 这使得直接缓冲区在通过套接字接收和发送数据时特别有用,但它们的创建和释放也更昂贵,这导致了池化缓冲区的想法。
PooledDataBuffer 是 DataBuffer 的扩展,有助于引用计数,这对于字节缓冲区池化至关重要。
它是如何工作的?当分配 PooledDataBuffer 时,引用计数为 1。调用 retain() 会增加计数,而
调用 release() 会减少计数。只要计数大于 0,就保证缓冲区不会被释放。
当计数减少到 0 时,池化缓冲区可以被释放,这在实践中可能意味着为缓冲区保留的内存返回到内存池。
请注意,在大多数情况下,最好使用 DataBufferUtils 中的便捷方法,这些方法仅当 DataBuffer 是 PooledDataBuffer 的实例时才对其应用释放或保留操作,而不是直接操作 PooledDataBuffer。
DataBufferUtils
DataBufferUtils 提供了许多用于操作数据缓冲区的实用方法:
-
将数据缓冲区流连接成一个单一的缓冲区,可能零拷贝,例如,如果底层字节缓冲区 API 支持,则通过复合缓冲区实现。
-
将
InputStream或 NIOChannel转换为Flux<DataBuffer>,反之亦然,将Publisher<DataBuffer>转换为OutputStream或 NIOChannel。 -
如果缓冲区是
PooledDataBuffer的实例,则提供释放或保留DataBuffer的方法。 -
从字节流中跳过或获取直到达到特定的字节数。
编解码器
org.springframework.core.codec 包提供了以下策略接口:
-
Encoder用于将Publisher<T>编码为数据缓冲区流。 -
Decoder用于将Publisher<DataBuffer>解码为更高级别的对象流。
spring-core 模块提供了 byte[]、ByteBuffer、DataBuffer、Resource 和
String 编码器和解码器实现。spring-web 模块增加了 Jackson JSON、
Jackson Smile、JAXB2、Protocol Buffers 以及其他编码器和解码器。请参阅
WebFlux 部分中的 编解码器。
使用 DataBuffer
在使用数据缓冲区时,必须特别注意确保缓冲区被释放,因为它们可能是 池化的。我们将使用编解码器来说明 这是如何工作的,但这些概念更具普遍性。让我们看看编解码器在内部必须如何管理数据缓冲区。
Decoder 是最后一个读取输入数据缓冲区(在创建更高级别对象之前)的组件,
因此它必须按如下方式释放它们:
-
如果
Decoder只是读取每个输入缓冲区并准备立即释放它,它可以通过DataBufferUtils.release(dataBuffer)来实现。 -
如果
Decoder正在使用Flux或Mono运算符(如flatMap、reduce等,它们在内部预取和缓存数据项), 或者正在使用filter、skip等运算符(它们会遗漏项),则必须将doOnDiscard(DataBuffer.class, DataBufferUtils::release)添加到组合链中, 以确保这些缓冲区在被丢弃之前(可能由于错误或取消信号)得到释放。 -
如果
Decoder以任何其他方式持有或多个数据缓冲区,则必须确保它们在完全读取后被释放, 或者在缓存的数据缓冲区被读取和释放之前发生错误或取消信号的情况下被释放。
请注意,DataBufferUtils#join 提供了一种安全有效的方式将数据缓冲区流聚合到单个数据缓冲区中。
同样,skipUntilByteCount 和 takeUntilByteCount 是供解码器使用的其他安全方法。
Encoder 分配数据缓冲区,其他人必须读取(并释放)这些缓冲区。因此 Encoder
没有太多事情要做。但是,如果由于填充缓冲区数据时发生序列化错误,Encoder
必须注意释放数据缓冲区。例如:
-
Java
-
Kotlin
DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
// serialize and populate buffer..
release = false;
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
// serialize and populate buffer..
release = false
} finally {
if (release) {
DataBufferUtils.release(buffer)
}
}
return buffer
Encoder 的消费者负责释放它接收到的数据缓冲区。
在 WebFlux 应用程序中,Encoder 的输出用于写入 HTTP 服务器响应或客户端 HTTP 请求,
在这种情况下,释放数据缓冲区是写入服务器响应或客户端请求的代码的责任。
请注意,在 Netty 上运行时,有关于 故障排除缓冲区泄漏 的调试选项。