大数跨境
0
0

08. 高质量网络库如何分层设计

08. 高质量网络库如何分层设计 CppGuide
2024-01-22
2

关注我,更多的优质内容第一时间阅读


计算机界有一句经典名言:“计算机科学领域的任何问题都可以通过增加一个间接的中间层来解决”(Any problem  in computer science can be solved by anther layer of indirection.)。这句话几乎概括了计算机系统软件体系结构的设计要点,整个体系结构从上到下都是按照严格的层次结构设计的,不仅是计算机系统软件整个体系是这样的,体系里面的每个组件比如 OS 本身,很多应用程序、软件系统甚至很多硬件结构都是按照这种层次的结构组织和设计的。

8.1 网络库设计中的各个层

在常见的网络通信库中,根据功能也可以分成很多层,根据离业务的远近从上到下依次是:

  • Session 层

    该层处于最上层,在设计上不属于网络框架本身的部分,其作用是记录各种业务状态数据和处理各种业务逻辑。业务逻辑处理完毕后,如果需要进行网络通信,则依赖 Connection 层进行数据收发。

    例如一个 session 类可能有如下接口和成员数据:

    class ChatSession
    {
    public:
        ChatSession(const std::shared_ptr<TcpConnection>& conn, int sessionid);
        virtual ~ChatSession();
        
        int32_t GetSessionId()
        {
            return m_id;
        }

        int32_t GetUserId()
        {
            return m_userinfo.userid;
        }

        std::string GetUsername()
        {
            return m_userinfo.username;
        }

        std::string GetNickname()
        {
            return m_userinfo.nickname;
        }

        std::string GetPassword()
        {
            return m_userinfo.password;
        }

        int32_t GetClientType()
        {
            return m_userinfo.clienttype;
        }

        int32_t GetUserStatus()
        {
            return m_userinfo.status;
        }

        int32_t GetUserClientType()
        {
            return m_userinfo.clienttype;
        }

        void SendUserStatusChangeMsg(int32_t userid, int type, int status = 0);

    private:
     //各个业务逻辑处理方法
        bool Process(const std::shared_ptr<TcpConnection>& conn, const char* inbuf, size_t buflength);
        
        void OnHeartbeatResponse(const std::shared_ptr<TcpConnection>& conn);
        void OnRegisterResponse(const std::string& data, const std::shared_ptr<TcpConnection>& conn);
        void OnLoginResponse(const std::string& data, const std::shared_ptr<TcpConnection>& conn);
        void OnGetFriendListResponse(const std::shared_ptr<TcpConnection>& conn);
        void OnFindUserResponse(const std::string& data, const std::shared_ptr<TcpConnection>& conn);
        void OnChangeUserStatusResponse(const std::string& data, const std::shared_ptr<TcpConnection>& conn);
        void OnOperateFriendResponse(const std::string& data, const std::shared_ptr<TcpConnection>& conn);
        
        std::shared_ptr<TcpConnection> GetConnectionPtr()
        {
            if (tmpConn_.expired())
                return NULL;

            return tmpConn_.lock();
        }
     //调用下层Connection层发送数据的方法
        void Send(int32_t cmd, int32_t seq, const std::string& data);
        void Send(int32_t cmd, int32_t seq, const char* data, int32_t dataLength);
        void Send(const std::string& p);
        void Send(const char* p, int32_t length);

    private:
        int32_t           m_id;                 //session id
        OnlineUserInfo    m_userinfo;
        int32_t           m_seq;                //当前Session数据包序列号
        bool              m_isLogin;            //当前Session对应的用户是否已经登录
        
        //引用下层Connection层的成员变量
        std::weak_ptr<TcpConnection>    tmpConn_;
    };

    上述代码中除了业务状态数据和业务接口以外,还有一个 Send 系列的函数,这个函数依赖 Connection 对象进行数据收发。但是需要注意的是 Session 对象并不拥有 Connection 对象,也就是说 Session 对象不控制 Connection 对象的生命周期,这是因为虽然 Session 对象的主动销毁(如收到客户端不合理的数据,关闭 Session 对象)会引起 Connection 对象的销毁,但 Connection 对象本身也可能因为网络出错等原因被销毁,进而引起 Session 对象被销毁。因此,上述类接口描述中,ChatSession 类使用了一个弱指针(weak_ptr)来引用 TCPConnection 对象。这是需要注意的地方。

  • Connection 层

    该层是网络框架设计中最上面的一层(技术层的最上层),每一路客户端连接对应一个 Connection 对象。一般用于记录该路连接的各种状态,常见的状态信息有,如连接状态、数据收发缓冲区信息、数据流量记录状态、本端和对端地址和端口号信息等,同时也提供对各种网络事件的处理接口,这些接口或被本层自己使用,或被 Session 层使用。Connection 持有一个 Channel 对象,且掌管着 Channel 对象的生命周期。

    一个 Connection 对象可能提供的接口和记录的数据状态如下:

    class TcpConnection
    {
     public:  
      TcpConnection(EventLoop* loop,
                   const string& name,
                   int sockfd,
                   const InetAddress& localAddr,
                   const InetAddress& peerAddr);
      ~TcpConnection();

      const InetAddress& localAddress() const { return localAddr_; }
      const InetAddress& peerAddress() const { return peerAddr_; }
      bool connected() const { return state_ == kConnected; }

      
      void send(const void* message, int len);
      void send(const string& message);  
      void send(Buffer* message);  // this one will swap data
      void shutdown(); // NOT thread safe, no simultaneous calling
      
      void forceClose();

      void setConnectionCallback(const ConnectionCallback& cb)
      {
       connectionCallback_ = cb;
      }

      void setMessageCallback(const MessageCallback& cb)
      {
       messageCallback_ = cb;
      }
      
      void setCloseCallback(const CloseCallback& cb)
      {
       closeCallback_ = cb;
      }
      
      void setErrorCallback(const ErrorCallback& cb)
      {
       errorCallback_ = cb;
      }

      Buffer* inputBuffer()
      {
       return &inputBuffer_;
      }

      Buffer* outputBuffer()
      {
       return &outputBuffer_;
      }

     private:
      enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
      void handleRead(Timestamp receiveTime);
      void handleWrite();
      void handleClose();
      void handleError();
      void sendInLoop(const string& message);
      void sendInLoop(const void* message, size_t len);
      void shutdownInLoop();
      void forceCloseInLoop();
      void setState(StateE s) { state_ = s; }

        private:
      //连接状态信息
      StateE                      state_;
      //引用Channel对象
      std::shared_ptr<Channel>    channel_;
      //本端的地址信息
      const InetAddress           localAddr_;
      //对端的地址信息
      const InetAddress           peerAddr_;
      
      ConnectionCallback          connectionCallback_;
      MessageCallback             messageCallback_;
      CloseCallback               closeCallback_;
      ErrorCallback    errorCallback_; 
      
      //接收缓冲区
      Buffer                      inputBuffer_;
      //发送缓冲区
      Buffer                      outputBuffer_; 
      //流量统计类
      CFlowStatistics    flowStatistics;
    };
  • Channel 层

    Channel 层一般持有一个 socket(Linux 下也叫 fd),是实际进行数据收发的地方,因而一个 Channel 对象会记录当前需要监听的各种网络事件(读写和出错事件)状态,同时提供对这些事件的状态的判断和增删改的接口。在部分网络库的实现中,Channel 对象管理着 socket 对象的生命周期,而另外一些库的实现则由 Connection 对象来管理 socket 的生命周期。如果实现是前者,则 Channel 对象也提供对 socket 进行创建和关闭的接口。由于 TCP 收发数据是全双工的(收发走独立的通道,互不影响),收发逻辑一般不会有什么依赖关系,但收发操作一般会在同一个线程中进行操作,这样的目的是为了防止收或发的过程中,改变了 socket 的状态,对另外一个操作产生影响。例如在一个线程中收数据时出错,关闭了连接,另外一个线程正在发送数据,这该情何以堪呢。

    一个 Channel 对象提供的函数接口和状态数据如下所示:

    class Channel
    {
    public:
     Channel(EventLoop* loop, int fd);
     ~Channel();

     void handleEvent(Timestamp receiveTime);
     
     int fd() const;
     int events() const;
     void set_revents(int revt);
     void add_revents(int revt);
     void remove_events();
     bool isNoneEvent() const;

     bool enableReading();
     bool disableReading();
     bool enableWriting();
     bool disableWriting();
     bool disableAll();

     bool isWriting() const { return events_ & kWriteEvent; }

    private:  
     const int                   fd_;
     //当前需要检测的事件
     int                         events_;
     //处理后的事件
     int                         revents_;   
    };
  • socket 层

    严格意义上说,并不存在所谓的 socket 层,这一层只是对常用的 socket 函数进行了一层封装,例如封装实现跨平台,方便上层(Channel 层或 Connection 层)使用。很多网络库没有这一层。例如下面就是对常用的 socket 函数的功能做了一层简单的封装:

    namespace sockets
    {  
            SOCKET createOrDie();
            SOCKET createNonblockingOrDie();

            void setNonBlockAndCloseOnExec(SOCKET sockfd);

            void setReuseAddr(SOCKET sockfd, bool on);
            void setReusePort(SOCKET sockfd, bool on);

      int  connect(SOCKET sockfd, const struct sockaddr_in& addr);
      void bindOrDie(SOCKET sockfd, const struct sockaddr_in& addr);
      void listenOrDie(SOCKET sockfd);
      int  accept(SOCKET sockfd, struct sockaddr_in* addr);
            int32_t read(SOCKET sockfd, void *buf, int32_t count);
    #ifndef WIN32
      ssize_t readv(SOCKET sockfd, const struct iovec *iov, int iovcnt);
    #endif
      int32_t write(SOCKET sockfd, const void *buf, int32_t count);
      void close(SOCKET sockfd);
      void shutdownWrite(SOCKET sockfd);

      void toIpPort(char* buf, size_t size, const struct sockaddr_in& addr);
      void toIp(char* buf, size_t size, const struct sockaddr_in& addr);
      void fromIpPort(const char* ip, uint16_t port, struct sockaddr_in* addr);

      int getSocketError(SOCKET sockfd);

      const struct sockaddr* sockaddr_cast(const struct sockaddr_in* addr);
      struct sockaddr* sockaddr_cast(struct sockaddr_in* addr);
      const struct sockaddr_in* sockaddr_in_cast(const struct sockaddr* addr);
      struct sockaddr_in* sockaddr_in_cast(struct sockaddr* addr);

      struct sockaddr_in getLocalAddr(SOCKET sockfd);
      struct sockaddr_in getPeerAddr(SOCKET sockfd);
    }

    在实际实践中,有的服务设计网络通信模块时会将 Connection 对象与 Channel 对象合并成一个对象,这取决于当前业务需要记录的技术上的数据的多少和技术上处理这些数据的复杂性高低。所以在某些服务代码中只看到 Connection 对象或者 Channel 对象请不要觉得奇怪。

    另外,对于服务器端程序,抛开业务本身,在技术层面上,我们需要管理许多的 Connection 对象,一般会使用一个叫 Server 对象(如 TcpServer)来集中管理,这是网络库本身需要处理好的部分。例如一个 TcpServer 对象可能提供的函数接口和状态数据如下:

    class TcpServer
    {
    public:
     typedef std::function<void(EventLoop*)> ThreadInitCallback;
     enum Option
     {
      kNoReusePort,
      kReusePort,
     };

     TcpServer(EventLoop* loop,
         const InetAddress& listenAddr,
         const std::string& nameArg,
         Option option = kReusePort);      
     ~TcpServer();  

     void addConnection(int sockfd, const InetAddress& peerAddr);  
     void removeConnection(const TcpConnection& conn);

     typedef std::map<string, TcpConnectionPtr> ConnectionMap;

    private:
     int                                             nextConnId_;
     ConnectionMap                                   connections_;
    };

对于客户端程序,同样也可以设计出一个 TCPClient 对象管理各个 Connector(连接器对象)。

对于 Session 对象来说,虽然与 Connection 对象一一对应,但在业务层(网络通信框架之外)需要有专门的类去管理这些 Session 对象的生命周期,我们一般把这个专门的类称之为 SessionManager 或者 SessionFactory。

8.2 Session 进一步分层

不同的服务,其业务可能千差万别,实际开发中我们可能根据业务场景,将 Session 层进一步拆分成多个层,每一层专注于其自己的业务逻辑。例如对于即时聊天服务器,我们可以将 Session 划分为两层:ChatSession、CompressionSession 和 TcpSession,ChatSession 专注于聊天业务本身的处理,CompressSession 负责数据的解压缩,TcpSession 用于将数据加工成网络层需要的格式或者将网络层送上来的数据还原成业务需要的格式(如数据装包和解包)。示意图如下:

8.3 连接信息与 EventLoop/Thread 的对应关系

综合各层对象,一个 socket(fd)只会对应一个channel 对象、一个 Connection 对象以及一个 Session 对象,这一组对象构成了一路连接信息(技术上加业务上的)。结合我们前面介绍了 one thread one loop 思想,每一路连接信息只能属于一个 loop,也就是只会属于某一个线程;但是,反过来,一个 loop 或者一个线程可以同时拥有多个连接信息。这就保证了我们只会在同一个线程里面去是处理特定的 socket 的收发事件。


相关阅读


小方目前在一家知名外企做 C++ 架构方面的工作。我建立 高质量 C++ 后端开发微信技术交流群,群里高手如云,不定期分享编程技术,也会提供一些求职和内推资源。


现在限时开放中,需要加群交流的同学可以加微信 cppxiaofang,备注“加群”。


关注我,更多的优质内容第一时间阅读



【声明】内容源于网络
0
0
CppGuide
专注于高质量高性能C++开发,站点:cppguide.cn
内容 1260
粉丝 0
CppGuide 专注于高质量高性能C++开发,站点:cppguide.cn
总阅读582
粉丝0
内容1.3k