当前位置 博文首页 > wanggao的专栏:muduo学习笔记:net部分之实现TCP网络编程库-Acc

    wanggao的专栏:muduo学习笔记:net部分之实现TCP网络编程库-Acc

    作者:[db:作者] 时间:2021-09-09 09:44

    前述文章围绕base、net两个模块各种组件,已经形成了初具规模的Reactor事件处理框架。从现在开始,逐步实现一个非阻塞的TCP网络编程库。不同于传统的Reactor,将timers 做成循环中单独的一步,muduo将 timers 和 IO handlers 视为等同的。
    在这里插入图片描述
    整个TCP网络库,分为服务端和客户端。服务端TcpServer使用reactor模式进行IO事件循环,使用Acceptor接收新的连接,每个连接都是一个TcpConnetion对象。客户端TcpClient使用Connector发起连接,通过TcpConection收发数据。客户端主动发起连接比服务端被动接收连接需要额外的错误处理、考虑重连,因此先介绍服务端的实现,本文先以Acceptor为开端。

    1、Acceptor定义

    Acceptor class用于accept(2)新TCP连接,并通过回调通知使用者,它是内部class,在上层应用程序中我们不直接使用,而是把它封装作为TcpServer的成员,生命期由后者控制。

    class Acceptor : noncopyable
    {
     public:
      typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;
    
      Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);
      ~Acceptor();
    
      void setNewConnectionCallback(const NewConnectionCallback& cb) 
      { newConnectionCallback_ = cb; } // 设置新连接建立的回调函数
    
      bool listenning() const { return listenning_; } // 返回当前监听状态
      void listen();    // 开启监听
    
     private:
      void handleRead();
    
      EventLoop* loop_;			// 基本是main reactor
      Socket acceptSocket_;		// 用于接收新连接的scoket封装
      Channel acceptChannel_;   // 封装acceptSocket_的channel,监听其上的事件
      NewConnectionCallback newConnectionCallback_;  // 建立新连接时调用的回调函数
      bool listenning_;			// 是否在监听的
      int idleFd_;				// 空闲的描述符
    };
    

    定义比较简单,直接看实现。

    2、Acceptor实现

    (1)构造和析构

    构造时,需要传递当前Acceptor所属的EventLoop,一般是是主线程或者main reactor中。创建一个listening socket,绑定指定ip和port,并构造一个channel用于监听此的连接。为处理无可用描述符时的连接请求,使用一个预留fd处理。

    析构时,关闭channel关注的IO事件,从Poller注销,关闭socket对象。

    Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
      : loop_(loop),
        acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
        acceptChannel_(loop, acceptSocket_.fd()),
        listenning_(false),
        idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
    {
      assert(idleFd_ >= 0);
      // 设置服务端socket选项,并绑定到指定ip和port
      acceptSocket_.setReuseAddr(true);      // addr重用
      acceptSocket_.setReusePort(reuseport); // 端口重用
      acceptSocket_.bindAddress(listenAddr); // bing
      // 使用channel监听socket上的可读事件(新的连接)
      acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
    }
    
    Acceptor::~Acceptor()
    {
      // 不关注socket上的IO事件,从EventLoop的Poller注销
      acceptChannel_.disableAll();
      acceptChannel_.remove();
      // 关闭socket
      ::close(idleFd_);
    }
    

    (2)开始监听新连接

    将状态设置为监听,然后调用监听socket的listen函数,将监听channel的读事件注册到poller的管理。

    void Acceptor::listen()
    {
      loop_->assertInLoopThread();
      listenning_ = true;
      acceptSocket_.listen();
      //   
      acceptChannel_.enableReading();
    }
    

    (3)处理新连接

    当有客户端发起连接时,监听channel触发读事件,那么调用其对应的回调函数Acceptor::handleRead()

    void Acceptor::handleRead()
    {
      loop_->assertInLoopThread();
      InetAddress peerAddr;
      //FIXME loop until no more
      int connfd = acceptSocket_.accept(&peerAddr); // 这里是真正接收连接 
      if (connfd >= 0) //新的连接成功
      {
        // string hostport = peerAddr.toIpPort();
        // LOG_TRACE << "Accepts of " << hostport;
        if (newConnectionCallback_){
          // 建立新的连接,调用TcpServer的回调,返回已连接的socketfd和peer端地址
          newConnectionCallback_(connfd, peerAddr);
        }
        else{
          // 若上层应用TcpServer未注册新连接回调函数,则直接关闭当前连接
          sockets::close(connfd); 
        }
      }
      else  // 连接异常,处理服务端fd耗尽
      {
        LOG_SYSERR << "in Acceptor::handleRead";
        // Read the section named "The special problem of
        // accept()ing when you can't" in libev's doc.
        // By Marc Lehmann, author of libev.
        if (errno == EMFILE)   // 无可用fd,如不处理,否则Poller水平触发模式下会一直触发
        {
          ::close(idleFd_); // 关闭空闲的fd,空出一个可用的fd
          idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); // 把前面调用acceptor没有接受的描述符接受进来到idleFd_ 
          ::close(idleFd_); // 把这个idleFd_ 关闭,就是关闭了当前此次连接
          idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); // 重新开启这个空闲描述符
        }
      }
    }
    

    当前文件描述符过多,无法接收新的连接。但是由于Poller采用LT模式,如果无法接收,可读事件会一直触发。那么在这个地方的处理机制就是,关掉之前创建的空闲的idleFd_,然后去accept让这个事件不会一直触发,然后再关掉该文件描述符,重新将它设置为空文件描述符。这种机制可以让网络库在处理连接过多、文件描述符不够用时,不至于因为LT模式一直触发而产生坏的影响。

    cs