生产者消费者模式

  1. 1. 环形缓冲区

最近一直在想要不要把游戏服务器的发送用发送队列,偶然搜了一下ConcurrentBag这个类,在博客中看见了生产者消费者模式,于是看了看。发现服务器给客户端发送消息的过程就是典型的生产者消费者模式,服务器发消息的过程:把消息放在队列里,另开启一个线程去遍历这个队列向客户端发消息。

其中服务器把消息放到队列的线程可看做生产者,取出消息的线程可看做消费者,其中存放消息的队列就是缓冲区。如果生产消息后即发送是同步发送,但放到队列里就可以不用等待其发送完成,这样解耦了生产消息和发送。

其中表示缓冲区的队列可用数组,栈,队列等数据结构,我目前给每个用户建立了1024字节数组的缓冲区,用户向服务器发消息就存到缓冲区里,缓冲区的作用涉及到了粘包和拆包,服务器收到消息后读取缓冲区,如果消息长度足够就读取消息,然后用Array.Copy()移动这个字节数组,覆盖掉已经读取的消息以达到复用缓冲区的目的,如果消息长度不够就等待下次发送,直到消息长度足够。

环形缓冲区

之前在github下载的一个游戏服务器代码里看见了环形缓冲区,我认为环形缓冲区相比于普通缓冲区可以去掉Array.Copy()也就是未读数据前移这一步。通过写下标,读下标得知从哪里写和目前读到哪里,通过每个消息的前4个字节得知该消息长度,通过一个维护的变量得知缓冲区还有多少未读消息长度,所以可直接覆盖掉原来的数据,从而省去将未读数据前移的步骤。其中缓冲区长度需要已知,设置个2048字节应该够用了。

目前我弄的游戏服务器发送貌似一直是同步发送,用了SocketAsyncEventArgs,每生成一个消息就赋值到这个类的缓冲区,之后调用SendAsync方法,但感觉还是放到发送队列比较好。
据说环形缓冲区最好存放值数据,如果存放对象的引用,还会涉及到对象的销毁等东西。

public class MsgBuffer
{
    // 缓冲区大小,必须为2的幂
    private int BufferSize { get; set; }

    // 缓冲区本区
    private byte[] Bytes { get; set; }

    // 读总数
    private uint _readCount;

    // 写总数
    private uint _writeCount;

    // 写下标
    private int WriteIdx => (int) (_writeCount & (uint) (BufferSize - 1));

    // 读下标
    private int ReadIdx => (int) (_readCount & (uint) (BufferSize - 1));

    // 缓冲区未读数据长度
    public int DataLength => (int) (_writeCount - _readCount);

    private readonly object _lockObj = new();

    public MsgBuffer(int size = 1024)
    {
        Bytes = new byte[size];
        BufferSize = size;
        _readCount = 0;
        _writeCount = 0;
    }
    
    /// <summary>
    /// 向环形缓冲区中写给定数组
    /// </summary>
    /// <param name="srcBytes">要写入缓冲区的数组</param>
    /// <param name="srcOffset">给定数组的偏移量</param>
    /// <param name="count">写入长度</param>
    /// <returns>已经写入的长度</returns>
    public int Write(byte[] srcBytes, int srcOffset, int count)
    {
        if (srcBytes == null || srcBytes.Length == 0) return 0;
        if (srcOffset > srcBytes.Length) return 0;
        
        lock (_lockObj)
        {
            if (BufferSize - DataLength <= count)
            {
                //扩展缓冲区
                ReSize(count);
            }

            var remainingLength = BufferSize - WriteIdx;
            if (remainingLength > count)
            {
                Buffer.BlockCopy(srcBytes, srcOffset, Bytes, WriteIdx, count);
            }
            else
            {
                Buffer.BlockCopy(srcBytes, srcOffset, Bytes, WriteIdx, remainingLength);
                Buffer.BlockCopy(srcBytes, remainingLength, Bytes, 0, count - remainingLength);
            }

            Interlocked.Add(ref _writeCount, (uint) count);
        }
        
        return count;
    }

    public byte[] GetRead(int count)
    {
        var destination = new byte[count];
        lock (_lockObj)
        {
            if (DataLength < count) return null;
            
            var remainingLength = BufferSize - ReadIdx;
            if (remainingLength > count)
            {
                Buffer.BlockCopy(Bytes, ReadIdx, destination, 0, count);
            }
            else
            {
                Buffer.BlockCopy(Bytes, ReadIdx, destination, 0, remainingLength);
                Buffer.BlockCopy(Bytes, 0, destination, remainingLength, count - remainingLength);
            }

            Interlocked.Add(ref _readCount, (uint) count);
            return destination;
        }
    }
}

大致实现就是这样,每次有消息过来就写在缓冲区里,增加写总数,读取就增加读总数,写总数减去读总数就是未读消息大小。用uint声明写总数和读总数为了使写总数与读总数超过uint范围时,此时相减还能得到未读消息大小。
缓冲区大小必须为2的幂是因为,写下标和读下标可直接用与运算得到下标值,避免了使用%取余。

单生产者单消费者的情况,也就是只有一个写和一个读线程的情况下,这个锁可以去掉。

2021年09月15日 19:30