当前位置 博文首页 > wanggao的专栏:muduo学习笔记:net部分之实现TCP网络编程库-Tcp

    wanggao的专栏:muduo学习笔记:net部分之实现TCP网络编程库-Tcp

    作者:[db:作者] 时间:2021-09-09 09:43

    TcpConnection是TCP网络库中最核心、最复杂的类,也是唯一使用shared_ptr管理的类,封装了一次TCP连接,但是它不能发起连接。当TcpServerTcpCliet成功建立连接后,都会通过TcpConnection对象进行收发数据通信。

    1、TCP连接的建立、断开流程

    TcpServer新建连接的函数相关调用时序见下图,其中cpServer左侧是等待连接的过程。TcpServer成员Acceptor持有一个listening socket,封装成channel并注册到EvenetLoop的Poller,等到新客户端连接,Poller监听到IO事件,派发到channel,执行Acceptor注册的回调,返回已连接的socket和对端ip和端口给TcpServer,进而创建一个表示当前连接的TcpConnection。TcpConnection将已连接的socket封装成channel,注册到EventLoop中的Poller,监听其可读、可写、关闭、出错等IO事件。多个tcp连接会保存在TcpServer的TcpConnection map结构中。
    在这里插入图片描述
    muduo只有一种方式断开连接:被动关闭,即客户端发起关闭连接。已连接的socket fd在时间循环中触发read返回0的事件被channel接受处理,调用TcpServer注册的removeConn()回调函数执行关闭。TcpServer首先从连接的TcpConnection map中删除当前连接,接着在IO线程中调用TcpConnection::connectDestroyed()函数,通过channel设置不再关注IO事件,调用服务端断连回调,最后从EventLoop移除channel、Poller中注销。
    在这里插入图片描述
    TcpConnection是muduo库中唯一使用shared_ptr来管理的类,因为它的生命周期模糊。当建立连接后,TcpServer中的map持有TcpConnection的shared_ptr指针,当TcpConnetion从TcpServer的连接map中erase()移除后,TcpConnetion的引用计数减一。如果用户不持有TcpConnection的shared_ptr指针,那么在调用TcpConnection::connectDestroyed()后,会出现访问已经析构的对象。因此,在TcpConnection内部在注册回调函数时,使用bind绑定其shared_ptr调用shared_from_this()来获取,以此增加引用计数、延长生命周期。

    TcpConnection中封装了InputBufferOutputBuffer,用来表示应用层的缓冲区,调用者提供要发送的数据即可,不用操心数据如何发送、是否分次发送等问题。

    2、TcpConnetion定义

    ///
    /// TCP connection, for both client and server usage.
    ///
    /// This is an interface class, so don't expose too much details.
    class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
    {
     public:
      /// Constructs a TcpConnection with a connected sockfd
      ///
      /// User should not create this object.
      TcpConnection(EventLoop* loop,const string& name,  
      			    int sockfd,const InetAddress& localAddr,const InetAddress& peerAddr);
      ~TcpConnection();
    
      // 获取当前TcpConnetction的一些属性、状态
      EventLoop* getLoop() const { return loop_; }
      const string& name() const { return name_; }
      const InetAddress& localAddress() const { return localAddr_; }
      const InetAddress& peerAddress() const { return peerAddr_; }
      bool connected() const { return state_ == kConnected; }
      bool disconnected() const { return state_ == kDisconnected; }
      // return true if success.
      bool getTcpInfo(struct tcp_info*) const;
      string getTcpInfoString() const;
    
      /// 提供上层调用的发送接口,直接发送或保存在Buffer中
      // void send(string&& message); // C++11
      void send(const void* message, int len);
      void send(const StringPiece& message);
      // void send(Buffer&& message); // C++11
      void send(Buffer* message);  // this one will swap data
    	
      // 关闭连接,设置TCP选项
      void shutdown(); // NOT thread safe, no simultaneous calling
      // void shutdownAndForceCloseAfter(double seconds); // NOT thread safe, no simultaneous calling
      void forceClose();
      void forceCloseWithDelay(double seconds);
      void setTcpNoDelay(bool on);
      
      // 开启/关闭当前连接socket上的可读事件监听(通过channel传递到Poller)
      void startRead();
      void stopRead();
      bool isReading() const { return reading_; }; // NOT thread safe, may race with start/stopReadInLoop
    
      // 中间变量
      void setContext(const std::any& context){ context_ = context; }
      const std::any& getContext() const { return context_; }
      std::any* getMutableContext() { return &context_; }
    
      // 回调函数设置
      void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; }
      void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; }
      void setWriteCompleteCallback(const WriteCompleteCallback& cb){ writeCompleteCallback_ = cb; }
      // 限制发送缓冲区最大的数据堆积量,一旦超过设定值调用指定回调函数
      void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark) { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
    
      /// Advanced interface 输入输出缓冲区,应用层不用关系底层处理流程
      Buffer* inputBuffer() { return &inputBuffer_; }
      Buffer* outputBuffer() { return &outputBuffer_; }
    
      /// Internal use only.
      void setCloseCallback(const CloseCallback& cb) { closeCallback_ = cb; }
    
      // called when TcpServer accepts a new connection
      void connectEstablished();   // should be called only once
      // called when TcpServer has removed me from its map
      void connectDestroyed();  // should be called only once
    
     private:
      enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
      void handleRead(Timestamp receiveTime);
      void handleWrite();
      void handleClose();
      void handleError();
      // void sendInLoop(string&& message);
      void sendInLoop(const StringPiece& message);
      void sendInLoop(const void* message, size_t len);
      void shutdownInLoop();
      // void shutdownAndForceCloseInLoop(double seconds);
      void forceCloseInLoop();
      void setState(StateE s) { state_ = s; }
      const char* stateToString() const;
      void startReadInLoop();    // 开始接收可读事件
      void stopReadInLoop();
    
      EventLoop* loop_;
      const string name_;
      StateE state_;  // FIXME: use atomic variable
      bool reading_;
      // 已连接的socketfd的封装Socket、Channel对象
      std::unique_ptr<Socket> socket_;
      std::unique_ptr<Channel> channel_;
      
      const InetAddress localAddr_;
      const InetAddress peerAddr_;
    
      // IO事件回调,要求在EventLoop中执行
      ConnectionCallback connectionCallback_;
      MessageCallback messageCallback_;
      WriteCompleteCallback writeCompleteCallback_;
      HighWaterMarkCallback highWaterMarkCallback_;
      CloseCallback closeCallback_;
      
      // 底层的输入、输出缓冲区的处理
      size_t highWaterMark_;
      Buffer inputBuffer_;
      Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer.
      std::any context_;
      // FIXME: creationTime_, lastReceiveTime_
      //        bytesReceived_, bytesSent_
    };
    
    typedef std::shared_ptr<TcpConnection> TcpConnectionPtr; // 智能指针管理
    

    2、TcpConnetion实现

    2.1、构造析构

    每一个TcpConnection都由TcpServer创建,构造时传递所属的EvenetLoop、已连接的socketfd、本端地址、对端地址。默认连接状态是kConneting。注册了channel上事件的回调处理函数。注意,此时channel上不关注任何IO事件。

    析构时,不对socketfd进行关闭的处理,作为服务端多由外部发起执行关闭。

    TcpConnection::TcpConnection(EventLoop* loop,
                                 const string& nameArg,
                                 int sockfd,
                                 const InetAddress& localAddr,
                                 const InetAddress& peerAddr)
      : loop_(CHECK_NOTNULL(loop)),
        name_(nameArg),
        state_(kConnecting),
        reading_(true),
        socket_(new Socket(sockfd)), 			// 已连接socketfd封装的Socket对象
        channel_(new Channel(loop, sockfd)),    // 构造的channel
        localAddr_(localAddr),
        peerAddr_(peerAddr),
        highWaterMark_(64*1024*1024)   // 缓冲区数据最大64M
    {
      // 向Channel对象注册可读事件
      channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, _1));
      channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));
      channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));
      channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));
      LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this << " fd=" << sockfd;
      // 设置保活机制
      socket_->setKeepAlive(true);
    }
    
    TcpConnection::~TcpConnection()
    {
      LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
                << " fd=" <<