大数跨境
0
0

06. 应用层通信协议包如何分片

06.  应用层通信协议包如何分片 CppGuide
2024-01-10
0

关注我,更多的优质内容第一时间阅读


我们这里说的包分片,指的是应用层的对包的拆分。当一个包的数据较大,超过一个包的最长长度时,我们需要对包进行分片。有的读者可能会有疑问:分成多个包就行了,为什么要对包进行分片?在实际应用中,一般会根据业务需求对包的类型进行编号,例如使用一个 wCmd 表示业务号,但某些业务类型某次携带的数据可能比较大,超过了单个包的最大长度,这个时候我们需要将该数据拆分成多个包片,但其业务号隶属于同一个包,这就是所谓的”包分片“。

在理解了包分片的原理后,设计包分片功能也很简单了。这里提供两种包分片的思路。

  1. 设置分片标志

    包头部分设置一个字段表示当前包是否属于某个大包的分片,分片标志字段一般有 4 种取值类型:无分片标志、包的第一个分片标志、包的最后一个分片标志、第一个分片与最后一个分片之间的包分片标志。

  2. 每个包分片的包头部分有该包的总分片数目和当前分片编号。

对于 TCP 协议来说,由于其数据传输本身是有序的,因此多个分片,只要我们一端按顺序依次发送,另外一端收包时一定会按发送的顺序收到。因此,我们不用考虑包分片的顺序问题。

我们来看一个具体的包分片的例子:

假设现在有如下协议头定义:

//与客户端交互协议包头
#pragma pack(push, 1)
typedef struct tagNtPkgHead
{
    unsigned char   bStartFlag;     //协议包起始标志 0xFF
    unsigned char   bVer;           //版本号
    unsigned char   bEncryptFlag;   //加密标志(如果不加密,则为0)
    unsigned char   bFrag;          //是否有包分片(1 有包分片 0 无包分片)
    unsigned short  wLen;           //总包长
    unsigned short  wCmd;           //命令号
    unsigned short  wSeq;           //包的序列号,业务使用
    unsigned short  wCrc;           //Crc16校验码
    unsigned int    dwSID;          //会话ID
    unsigned short  wTotal;         //有包分片时,分片总数
    unsigned short  wCurSeq;        //有包分片时,分片序号,从0开始,无分片时也为0
} NtPkgHead, *PNtPkgHead;
#pragma pack(pop)

对端在处理包分片的逻辑伪码如下:

UINT CSocketClient::RecvDataThreadProc(LPVOID lpParam)
{
    LOG_NORMAL("Start recv data thread.");
    DWORD           dwWaitResult;
    std::string     strPkg;
    //临时存放一个完整的包数据的变量
    std::string     strTotalPkg;
    unsigned short  uPkgLen = 0;
    unsigned int    uBodyLen = 0;
    unsigned int    uTotalPkgLen = 0;
    unsigned int    uCmd = 0;
    NtPkgHead       pkgHead;
    unsigned short  uTotal = 0;
    //记录上一次的包分片序号,包分片序号从0开始
    unsigned short  uCurSeq = 0;
    int             nWaitTimeout = 1;

    CSocketClient* pSocketClient = (CSocketClient*)lpParam;

    while (!m_bExit)
    {      
        //检测是否有数据
        if (!pSocketClient->CheckReceivedData())
        {
            //休眠10豪秒
            Sleep(10);
            continue;
        }
            
        //接收数据,并放入pSocketClient->m_strRecvBuf中
        if (!pSocketClient->Recv())
        {
            LOG_ERROR("Recv data error");
                
            //收数据出错,清空接收缓冲区,可以做一些关闭连接、重连等动作,
            pSocketClient->m_strRecvBuf.clear();

            Reconnect();
            continue;
        }

        //一定要放在一个循环里面解包,因为当前缓冲区中可能存在多个数据包
        while (true)
        {
            //判断当前收到的数据是否够一个包头大小
            if (pSocketClient->m_strRecvBuf.length() < sizeof(NtPkgHead))
                break;

            memset(&pkgHead, 0, sizeof(pkgHead));
            memcpy_s(&pkgHead, sizeof(pkgHead), pSocketClient->m_strRecvBuf.c_str(), sizeof(pkgHead));
            
            //对包消息头检验
            if (!CheckPkgHead(&pkgHead))
            {
                //如果包头检验不通过,缓冲区里面的数据已经是脏数据了,直接清空掉,
                //可以做一些关闭连接并重连的动作             
                LOG_ERROR("Check package head error, discard data %d bytes", (int)pSocketClient->m_strRecvBuf.length());
                
                pSocketClient->m_strRecvBuf.clear();

                Reconnect();
                break;
            }

            //判断当前数据是否够一个整包的大小
            uPkgLen = ntohs(pkgHead.wLen);
            if (pSocketClient->m_strRecvBuf.length() < uPkgLen)
                break;

            strPkg.clear();
            strPkg.append(pSocketClient->m_strRecvBuf.c_str(), uPkgLen);

            //从收取缓冲区中移除已经处理的数据部分
            pSocketClient->m_strRecvBuf.erase(0, uPkgLen);

            uTotal = ::ntohs(pkgHead.wTotal);
            uCurSeq = ::ntohs(pkgHead.wCurSeq);
            //无分片或第一个分片
            if (uCurSeq == 0)
            {
                strTotalPkg.clear();
                uTotalPkgLen = 0;
            }

            uBodyLen = uPkgLen - sizeof(NtPkgHead);
            uTotalPkgLen += uBodyLen;
            strTotalPkg.append(strPkg.data() + sizeof(NtPkgHead), uBodyLen);

            //无分包 或 分包的最后一个包 则将组装后的包发送出去
            if (uTotal == 0 || (uTotal != 0 && uTotal == uCurSeq + 1))
            {
                uCmd = ::ntohs(pkgHead.wCmd);

                //ProxyPackage是解析出来的业务包定义
                ProxyPackage proxyPackage;
                //拷贝业务号
                proxyPackage.nCmd = uCmd;
                //拷贝包长度
                proxyPackage.nLength = uTotalPkgLen;
                //拷贝包体内容
                proxyPackage.pszJson = new char[uTotalPkgLen];
                memset(proxyPackage.pszJson, 0, uTotalPkgLen * sizeof(char));
                memcpy_s(proxyPackage.pszJson, uTotalPkgLen, strTotalPkg.c_str(), strTotalPkg.length());

                //将一个完整的包交给业务处理
                pSocketClient->m_pNetProxy->AddPackage((const char*)&proxyPackage, sizeof(proxyPackage));
            }
        }// end inner-while-loop


    }// end outer-while-loop


    LOG_NORMAL("Exit recv data thread.");

    return 0;
}

上述代码在一个网络数据收取线程中,先检测是否有可读数据,如果有可读数据,则从 socket 上读取该数据存入接收缓冲区 pSocketClient->m_strRecvBuf 中,然后判断收到的数据是否够一个包头的大小(sizeof(NtPkgHead)),如果不够,退出当前循环等待后续数据到来;如果够,对包头数据进行校验后,从包头中得到整包的大小(ntohs(pkgHead.wLen))(这里表示整包的大小的字段 wLen 使用了网络字节序,我们调用 ntohs() 函数得到本机字节序);然后判断收到的数据是否够一个整包的大小,如果不够,退出当前循环等待后续数据到来;如果够,则根据记录当前包分片序号的变量 uCurSequCurSeq = ::ntohs(pkgHead.wCurSeq))来确定该包是否是某个分片,uCurSeq 等于 0 时说明此次从一个新的包片或完整的包开始的;从接收缓冲区中将当前包片或者完整包的数据放入变量 strTotalPkg 中存储起来(注意 pkgHead.wTotalpkgHead.wCurSeq 均使用了网络字节序,需要转换成本地字节序)。接着,根据包头字段 pkgHead.wTotalpkgHead.wCurSeq 转换成本机字节序的值判断这是否是一个完整的包(当 uTotal == 0 时)或者是最后一个包分片(当 uTotal != 0 && uTotal == uCurSeq + 1 时),此时 strTotalPkg 存放的就是一个完整的包数据了,接着将其拷贝出来(这里是拷贝至 ProxyPackage 结构中),进行业务逻辑处理。如果当前包片只是一个大包的中间包片,则继续进行下一轮数据的处理。strTotalPkg 中存放的数据达到一个完整的包时会在业务处理后、下一轮循环存入新的包片数据前清空掉(代码第 81 行)。

上述流程可用如下流程图表示:


关注我,更多的优质内容第一时间阅读


更多的专题参见:cppguide.cn


技术交流群:加微信 cppxiaofang,备注加微信群。


付费课程

C/C++ 网络编程实战训练营 一期录像

C/C++ 网络编程二期录像 限时特惠

【声明】内容源于网络
0
0
CppGuide
专注于高质量高性能C++开发,站点:cppguide.cn
内容 981
粉丝 0
CppGuide 专注于高质量高性能C++开发,站点:cppguide.cn
总阅读2
粉丝0
内容981