为什么需要发送和接收缓存

Posted on Wed 30 September 2020 in Journal

基于网络的应用程序都需要将接收到的数据先放入缓冲区,等一个数据包完整接收到了再传递给应用层。 大家都知道TCP是面向字节流的,发送方 send 了 n 字节,但接收方并不知道一次 read 操作收到了多少字节,可能是1,可能是n, 也可能是n-x 或 n+x (x 未知)。

发送数据也是一样,一个数据包可能只发送了一部分,剩余的放在缓冲区中在 socket 端口可写时通过 on_write 回调函数中继续发送。

这里缓冲区的设计就很有讲究,尽量避免不必要的内存分配和复制,以提高性能。它可以是一个字符队列:

  • 发送方缓冲:从队尾追加数据,从队头取出数据发送到 socket
  • 接收方缓冲:从队头取出数据,从队尾接收从 socket 中的数据

最简单的方法就是开辟一块内存,比如一个大数组为缓冲区,设置一个读指针 readIndex,从readIndex 位置开始读一直读到 writeIndex, 一个写指针writeIndex,数据从writeIndex 开始写一直写到 capacity。

著名的 C++ 网络编程框架 ACE 中就有 ACE_Message_Block 的设计

ACE_Message_Block

ACE_Message_Block 主要有读指针,写指针,数据块(ACE_Data_Block), 和连接指针(指向下一个消息体),这样就会将收到的数据串成一个链表。

再以 Netty 中的 ByteBuf 为例详细了解一下其设计思想

ByteBuf

1)0 ~ readIndex 为无效区域 2)readIndex ~ writeIndex 为可读区域 3)writeIndex ~ capacity 为可写区域 4)capacity ~ maxCapacity 为可扩容区域

具体实现类为 AbstractByteBuf 的各个子类,主要区别在于是不是使用了内存池,是不是在堆内

AbstractByteBuf

内存区域主要分两类:

  1. 堆内内存: heap 堆内存
  2. 堆外内存: direct 或 native 内存
ByteBuf 实现 内存池中? 安全? 堆内?
PooledHeapByteBuf Y Y Y
PooledUnsafeHeapByteBuf Y N Y
PooledDirectByteBuf Y Y N
PooledUnsafeDirectByteBuf Y N N
UnpooledHeapByteBuf N Y Y
UnpooledUnsafeHeapByteBuf N N Y
UnpooledDirectByteBuf N Y N
UnpooledUnsafeDirectByteBuf N N N

主要方法有

方法 说明
capacity() 容量=废弃的字节数+可读字节数+可写字节数
maxCapacity() ByteBuf 最大所能容纳的最大字节数
isWritable() ByteBuf 是否可写, capacity() > writerIndex
writeBytes(byte[] src) 写入字节
isReadable() ByteBuf 是否可写, writerIndex > readerIndex
readBytes(byte[] dst) 读取字节

内存的分配是交由 ByteBufAllocator 来分配的

ByteBufAllocator

写段代码演示一下

    public static void printBufferIndex(ByteBuf buffer, String message) {

        log.info("# {} -> buffer: {},  readableTypes {}, writableBytes: {}, capacity: {}",
                message, buffer,  buffer.readableBytes(), buffer.writableBytes(), buffer.capacity());
    }

    @Test
    public void testByteBuf() {

        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(12, 16);

        buffer.writeBytes(new byte[] { 1 , 2, 3, 4, 5, 6});
        printBufferIndex(buffer, "write 6 bytes");
        assertTrue(buffer.readerIndex() == 0 && buffer.writerIndex() == 6);

        buffer.writeBytes(new byte[] {  7, 8, 9, 10, 11 ,12, 13, 14, 15, 16});
        printBufferIndex(buffer, "write 12 bytes");
        assertTrue(buffer.readerIndex() == 0 && buffer.writerIndex() == 16);

        int size = buffer.readableBytes();
        byte[] output = new byte[size];
        buffer.readBytes(output);
        printBufferIndex(buffer, String.format("read %d bytes", size));
        assertTrue(buffer.readerIndex() == 16 && buffer.writerIndex() == 16);

        buffer.discardReadBytes();
        printBufferIndex(buffer, "discardReadBytess");
        assertTrue(buffer.readerIndex() == 0 && buffer.writerIndex() == 0);
    }

执行结果如下

# 先写6个字节,readIndex = 0,   writeIndex = 6
write 6 bytes -> buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 12/16),  readableTypes 6, writableBytes: 6, capacity: 12
# 再写10个字节,readIndex = 0,   writeIndex = 16
write 12 bytes -> buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16/16),  readableTypes 16, writableBytes: 0, capacity: 16
# 再读16个字节,readIndex = 16,   writeIndex = 16
read 16 bytes -> buffer: PooledUnsafeDirectByteBuf(ridx: 16, widx: 16, cap: 16/16),  readableTypes 0, writableBytes: 0, capacity: 16
# 已经读过的字节丢弃掉,readIndex = 0,   writeIndex = 0
discardReadBytess -> buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 16/16),  readableTypes 0, writableBytes: 16, capacity: 16

零拷贝

除了通过读写指针来减少内存的复制,Netty 还应用了如下的技术来提高性能

  • Netty 接收及发送 ByteBuffer 用 DirectBuffer, 使用堆外直接内存进行 socket 读写,不需要进行字节缓冲区的二次拷贝

  • Netty 使用 ComposeByteBuffer ,可以聚合多个 ByteBuffer 对象,不需要通过内存拷贝的方式来合并几个小的 ByteBuffer 到一个大的 ByteBuffer

  • Netty 对于文件传输采用了 transferTo 方法,可以直接将文件缓冲区的数据发送到目标 Channel, 避免了通过循环 write() 的方式进行内存拷贝