环形缓冲区与生产者消费者模式

  1. 1. 环形缓冲区

首先声明,生产者消费者模式不属于设计模式里的一种。

最近一直在想要不要把游戏服务器的发送用发送队列,偶然搜了一下ConcurrentBag这个类,在博客中看见了生产者消费者模式。

服务器给客户端发送消息的过程就是典型的生产者消费者模式,服务器发消息的过程就是把消息放在队列里,另开启一个线程去遍历这个队列向客户端发消息。

其中服务器把消息放到队列的线程可看做生产者,取出消息的线程可看做消费者,其中还需要一个缓冲区存放消息的队列。如果生产消息后即发送即是同步发送,但放到队列里就可以不用等待其发送完成,这样解耦了生产消息和发送。

其中表示缓冲区的队列可用数组,队列等数据结构,用户向服务器发消息就存到缓冲区里。缓冲区的主要作用有粘包和拆包。服务器收到消息后存到缓冲区,如果消息长度足够就读取消息,然后用向前移动这个字节数组,覆盖掉已经读取的消息以达到复用缓冲区的目的。如果消息长度不够就等待下次发送,直到消息长度足够。

环形缓冲区

之前在github下载的一个游戏服务器代码里看见了环形缓冲区,环形缓冲区相比于普通缓冲区可以去掉未读数据前移这一步。

环形缓冲区的实现里我觉得最好的就是,记录读总数和写总数,写总数减去读总数就是未读消息长度。写下标和读下标可通过取余得到。

据说环形缓冲区最好存放值数据,如果存放对象的引用,还会涉及到对象的销毁等东西。

public class RingBuffer
{
    // 缓冲区大小,必须为2的幂
    private int BufferSize { get; set; }
    
    //缓冲区最大长度
    private int MaxBufferSize { get; set; }
    
    // 缓冲区本区
    private byte[] Bytes { get; set; }
    
    // 读总数
    private uint _readCount;
    
    // 写总数
    private uint _writeCount;
    
    // 写下标
    private int WriteIdx => (int) (_writeCount & (uint) (BufferSize - 1));
    
    // 读下标
    private int ReadIdx => (int) (_readCount & (uint) (BufferSize - 1));
    
    // 缓冲区未读数据长度
    public int DataLength => (int) (_writeCount - _readCount);
    
    private readonly object _lockObj = new();

    /// <summary>
    /// 构造函数,初始化缓冲区,设置缓冲区大小
    /// </summary>
    /// <param name="size">缓冲区大小,必须为2的幂</param>
    /// <param name="maxBufferSize"></param>
    public RingBuffer(int size = 2048, int maxBufferSize = 8192)
    {
        Bytes = new byte[size];
        BufferSize = size;
        MaxBufferSize = maxBufferSize;
        _readCount = 0;
        _writeCount = 0;
    }
    
    /// <summary>
    /// 向环形缓冲区中写给定数组
    /// </summary>
    /// <param name="srcBytes">要写入缓冲区的数组</param>
    /// <param name="srcOffset">给定数组的偏移量</param>
    /// <param name="count">写入长度</param>
    /// <returns>是否写入完毕</returns>
    public bool Write(byte[] srcBytes, int srcOffset, int count)
    {
        if (srcBytes == null || srcBytes.Length == 0) return false;
        if (srcOffset > srcBytes.Length) return false;
        // lock (_lockObj)//只有一个写和一个读线程的情况下,这里锁就可以注释掉
        {
            if (BufferSize - DataLength <= count)
            {
                //扩展缓冲区
                var reSize = ReSize(count);
                if (!reSize)
                {
                    return false;
                }
            }

            var remainingLength = BufferSize - WriteIdx;
            if (remainingLength > count)
            {
                Buffer.BlockCopy(srcBytes, srcOffset, Bytes, WriteIdx, count);
            }
            else
            {
                Buffer.BlockCopy(srcBytes, srcOffset, Bytes, WriteIdx, remainingLength);
                Buffer.BlockCopy(srcBytes, remainingLength, Bytes, 0, count - remainingLength);
            }

            Interlocked.Add(ref _writeCount, (uint) count);
        }

        return true;
    }

    public byte[] GetRead(int count)
    {
        var destination = new byte[count];
        lock (_lockObj)
        {
            if (DataLength < count) return null;
            
            var remainingLength = BufferSize - ReadIdx;
            if (remainingLength > count)
            {
                Buffer.BlockCopy(Bytes, ReadIdx, destination, 0, count);
            }
            else
            {
                Buffer.BlockCopy(Bytes, ReadIdx, destination, 0, remainingLength);
                Buffer.BlockCopy(Bytes, 0, destination, remainingLength, count - remainingLength);
            }

            Interlocked.Add(ref _readCount, (uint) count);
            return destination;
        }
    }
    
    /// <summary>
    /// 扩展缓冲区,必须以2的幂扩展
    /// </summary>
    /// <param name="count"></param>
    private bool ReSize(int count)
    {
        if (BufferSize >= MaxBufferSize)
        {
            return false;
        }

        lock (_lockObj)
        {
            if (count < BufferSize - DataLength)
            {
                return false;
            }

            var size = BufferSize;
            while (count >= size - DataLength)
            {
                size = BufferSize * 2;
            }

            var newBytes = new byte[size];
            if (WriteIdx > ReadIdx || DataLength == 0)
            {
                Buffer.BlockCopy(Bytes, ReadIdx, newBytes, 0, DataLength);
            }
            else
            {
                var remainingLength = BufferSize - ReadIdx;
                Buffer.BlockCopy(Bytes, ReadIdx, newBytes, 0, remainingLength);
                Buffer.BlockCopy(Bytes, 0, newBytes, remainingLength, DataLength - remainingLength);
            }

            Bytes = newBytes;
            BufferSize = size;
            return true;
        }
    }
}

大致实现就是这样,每次有消息过来就写在缓冲区里,增加写总数,读取就增加读总数,写总数减去读总数就是未读消息大小。

用uint声明写总数和读总数为了使写总数与读总数超过uint范围时,此时相减还能得到未读消息大小。

缓冲区大小必须为2的幂是因为,写下标和读下标可直接用与运算得到下标值,避免了使用%取余。

单生产者单消费者的情况,也就是只有一个写和一个读线程的情况下,写缓冲区方法里那个锁可以去掉。

2021年09月15日 19:30