当前位置 博文首页 > wanggao的专栏:muduo学习笔记:net部分之实现TCP网络编程库-Con

    wanggao的专栏:muduo学习笔记:net部分之实现TCP网络编程库-Con

    作者:[db:作者] 时间:2021-09-09 09:42

    类比于TcpServer被动接受连接使用Acceptor,TcpClient主动发起连接使用Connector。

    Connector使用的几个要点:

    • 非阻塞网络编程中,发起连接的基本方式是调用connect(2),当socket变得可写时表明连接建立完毕,其中要处理各种类型的错误。
    • Connector只负责建立socket连接,不负责创建TcpConnection(后文中的TcpClient类实现TcpConnection的创建),它的ConnectionCallBack回调的参数的socket fd。
    • 错误处理时,socket可写不一定就是连接建立好了。当连接建立出错时,套接口描述符变成既可读又可写,这时我们可以通过调用getsockopt来得到套接口上待处理的错误(SO_ERROR).
    • 非阻塞网络编程中connect(2)的sockfd是一次性的,一旦出错(比如对方拒绝连接)就无法恢复,只能关闭重来。 但Connector是反复使用的, 因此每次尝试连接都要使用新的socket文件描述符和新的Channel对象,要注意Channel的生命期管理。
      在这里插入图片描述

    1、Connector 的定义

    Connector 在TcpClient中创建,传递所属的EventLoop、服务端的地址。暴露给外部使用接口只有5个。大部分connect的操作都在loop中执行。

    class Connector : noncopyable, 
    				  public std::enable_shared_from_this<Connector>
    {
     public:
      typedef std::function<void (int sockfd)> NewConnectionCallback; // 连接成回调函数原型
    
      Connector(EventLoop* loop, const InetAddress& serverAddr);  // 构造函数
      ~Connector();
    
      void setNewConnectionCallback(const NewConnectionCallback& cb)
      { newConnectionCallback_ = cb; }
    
      void start();  // 安全,实际调用EventLoop::queueInLoop()
      void restart();  // 不安全,只能在lopp thread调用
      void stop();  // 安全,实际调用EventLoop::queueInLoop()  
    
      const InetAddress& serverAddress() const { return serverAddr_; }
    
     private:
      enum States { kDisconnected, kConnecting, kConnected };
      static const int kMaxRetryDelayMs = 30*1000;  // 重连的最大延迟时间
      static const int kInitRetryDelayMs = 500;     // 初始重连的延迟时间
    
      void setState(States s) { state_ = s; }
      void startInLoop();  			// start()的queueInLoop实际调用
      void stopInLoop();			// stop()的queueInLoop实际调用
      void connect();				// 创建sockefd,调用::connetc(2)系统调用
      void connecting(int sockfd);	// ::connetc(2)连接成,构造Channel,注册事件
      void handleWrite();			// 处理已连接的socketfd可读事件
      void handleError();			// 处理已连接的socketfd错误事件
      void retry(int sockfd);		// 尝试重连
      int removeAndResetChannel();	// 断开连接,重置Channel
      void resetChannel();
    
      EventLoop* loop_;				// 所属的事件循环loop
      InetAddress serverAddr_;     	// 要连接的服务端地址
      bool connect_; // atomic     	// 是否要连接的标志
      States state_;  // FIXME: use atomic variable
      std::unique_ptr<Channel> channel_;			// 客户端用于通信的socketfd创建的channel
      NewConnectionCallback newConnectionCallback_; // 建立成功时的回调函数
      int retryDelayMs_;  							// 重连的延迟时间ms
    };
    

    2、Connector 的实现

    2.1、构造、析构

    构造初始化列表对成员变量赋值。析构函数不做任何处理,关闭连接使用特定函数。

    Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
      : loop_(loop),						// 所属EvenetLoop
        serverAddr_(serverAddr),			// 要连接的服务端地址
        connect_(false),					// 现在不去建立连接
        state_(kDisconnected),				// 初始连接状态
        retryDelayMs_(kInitRetryDelayMs)	// 重连延迟时间
    {
      LOG_DEBUG << "ctor[" << this << "]";
    }
    
    Connector::~Connector()
    {
      LOG_DEBUG << "dtor[" << this << "]";
      assert(!channel_);
    }
    

    2.2、准备连接 start()

    Connector的持有者TcpClient调用 start(),开始连接,内部调用loop_->runInLoop,正常情况调用connect()函数,分步骤如下:

    • 创建非阻塞的sockets::createNonblockingOrDie(),返回socketfd
    • 调用::connect()系统调用,根据返回值判断连是否成功,分为三种情况:
      • 返回值0,正常连接成功;返回值为EINPROGRESS表示连接正在进行,等后续socketfd可读后进一步确认,或者尝试重连。
      • 本机、服务端临时错误,可以尝试重连
      • 其他如权限、协议错误等,直接关闭连接
    • 根据返回值进一步处理,选择连接处理、重连、关闭

    注意,仅在设置connect_为true时,才发起连接处理。如果调用stop()断开连接后,这里的代码将不处理连接。

    void Connector::start()
    {
      connect_ = true;
      loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe
    }
    
    void Connector::startInLoop()
    {
      loop_->assertInLoopThread();
      assert(state_ == kDisconnected);
      if (connect_){    // 仅处理 开始连接标志位true的情况
        connect();
      }
      else{
        LOG_DEBUG << "do not connect";
      }
    }
    
    void Connector::connect()
    {
      int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
      int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
      int savedErrno = (ret == 0) ? 0 : errno;
      switch (savedErrno)
      {
        case 0:
        case EINPROGRESS:      	// 非阻塞不能立即返回,正在连接中,需后续socketfd可读检查SO_ERROR
        case EINTR:				// 连接被SIGNAL中断
        case EISCONN:			// 已连接状态
          connecting(sockfd);
          break;
    
        case EAGAIN:			// 本地地址处于使用状态
        case EADDRINUSE:		// 地址已被使用
        case EADDRNOTAVAIL:		// 无可用的本地端口用于连接
        case ECONNREFUSED:	    // 服务端地址上没有socket监听
        case ENETUNREACH:		// 网络不可达(防火墙?)
          retry(sockfd);
          break;
    
        case EACCES:			
        case EPERM:				// 连接到广地地址但未设置该标志位;本地防火墙规则不允许连接
        case EAFNOSUPPORT:		// 不支持当前sa_family
        case EALREADY:			// socket是非阻塞的,但前一个连接请求尚未处理结束
        case EBADF:  			// sockfd参数在描述符表中无效
        case EFAULT:			// socket数据结构不在用户地址空间
        case ENOTSOCK:			// fd不是sokcet类型
          LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
          sockets::close(sockfd);
          break;
    
        default:
          LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
          sockets::close(sockfd);
          // connectErrorCallback_();
          break;
      }
    }
    

    2.3、连接中的处理 connecting()

    开始连接中的处理过程,设置当前连接状态kConnecting,将当前socketfd构造成Channel,并注册可写、错误事件到Poller。通过Poller监听的可读事件进一步判断连接状态。

    void Connector::connecting(int sockfd)
    {
      setState(kConnecting);  // 设置状态
      assert(!channel_);
      // 重设channel_,设置可写事件回调、错误事件回调
      channel_.reset(new Channel(loop_, sockfd));
      channel_->setWriteCallback(std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
      channel_->setErrorCallback(std::bind(&Connector::handleError, this)); // FIXME: unsafe
    
      // channel_->tie(shared_from_this()); is not working,
      // as channel_ is not managed by shared_ptr
      channel_->enableWriting();   // 注册到Poller
    }
    

    2.4、处理事件 handleWrite()、handleError()

    监听到socketfd的可读事件,确认当前连接是否成功建立,如果当前状态是kDisconnected,则不进行处理。

    如果当前状态是kConnecting连接中的状态,重置channel_,并从socketfd获取SO_ERROR。如果SO_ERROR值不为零有错误,或者是自连接,则尝试retry(sockfd)重连;SO_ERROR值为0表示连接成功,正常进行回调传递已连接socketfd给TcpClient。在处理错误事件中处理状态是kConnecting的情况,重置channel_并尝试重连。

    注意,调用removeAndResetChannel()函数返回socketfd,是因为当前socketfd是已经连接上服务端的,后续TcpClient需要接管socketfd并创建TcpConnection,并将socketd的IO事件注册到Poller,进行服务端-客户端之间的通信,因此在连接上的期间需要当前Connector不再管理该描述符。

    void Connector::handleWrite()
    {
      LOG_TRACE << "Connector::handleWrite " << state_;
    
      if (state_ == kConnecting){   // 当前是kConnecting连接中的状态
        int sockfd = removeAndResetChannel();  // 重置连接,并返回上一次连接使用的soketfd
        int err = sockets::getSocketError(sockfd);  // 获取上一次错误
        if (err){  // 连接失败
          LOG_WARN << "Connector::handleWrite - SO_ERROR = " << err << " " << strerror_tl(err);
          retry(sockfd);
        }
        else if (sockets::isSelfConnect(sockfd)){   // 自连接
          LOG_WARN << "Connector::handleWrite - Self connect";
          retry(sockfd);
        }
        else{  //  错误值0, 连接成功
          setState(kConnected);    // 设置为kConnected已连接状态
          if (connect_){	
            newConnectionCallback_(sockfd); // 回调TcpClient的连接成功回调,返回已连接socketfd
          }
          else{
            sockets::close(sockfd); // 若不进行连接,则关闭当前已连接socketfd