当前位置 博文首页 > wanggao的专栏:muduo学习笔记:net部分之实现TCP网络编程库-Tcp

    wanggao的专栏:muduo学习笔记:net部分之实现TCP网络编程库-Tcp

    作者:[db:作者] 时间:2021-09-09 09:42

    有了前文【muduo学习笔记:net部分之实现TCP网络编程库-Connector】的介绍,TcpClient的实现就不难了。muduo用TcpClient发起连接,TcpClient有一个Connector连接器,TCPClient使用Conneccor发起连接, 连接建立成功后, 用socket创建TcpConnection来管理连接, 每个TcpClient class只管理一个TcpConnecction,连接建立成功后设置相应的回调函数。很显然,TcpClient用来管理客户端连接,真正连接交给Connector。

    它的代码与TcpServer甚至有几分相似,只不过TcpClient只管理一个TcpConnection。先谈几个要点:

    • TcpClient具备TcpConnection断开之后重新连接的功能,加上Connector具备反复尝试连接的功能,因此客户端和服务器的启动顺序无关紧要。可以先启动客户端,一旦服务器启动,半分钟之内即可恢复连接(由Connector::kMaxRetryDelayMs常数控制);再客户端运行期间服务器可以重启,客户端也会自动重连。
    • 连接断开后初次重试的延迟时间是随机的,比方说服务器崩溃,它所有的客户端连接同时断开,然后0.5s之后同时再次发起连接,这样既可能造成SYN丢包,也可能给服务器带来短期大负载,影响其服务质量。因此每个TcpClient应该等待一段随机的时间(0.5~2s),再重试,避免拥塞。
    • 发起连接的时候如果发生TCP SYN丢包,那么系统默认的重试间隔是3s,这期间不会返回错误码,而且这个间隔似乎不容易修改。如果需要缩短间隔,可以再用一个定时器,在0.5s或1s之后发起另一个链接。如果有需求的话,这个功能可以做到Connector中。

    1、TcpClient 定义

    TcpClient使用Conneccor发起连接, 连接建立成功后,用socket创建TcpConnection来管理连接, 每个TcpClient class只管理一个TcpConnecction。

    class TcpClient : noncopyable
    {
     public:
      // TcpClient(EventLoop* loop);
      // TcpClient(EventLoop* loop, const string& host, uint16_t port);
      TcpClient(EventLoop* loop,
                const InetAddress& serverAddr,
                const string& nameArg);
      ~TcpClient();  // force out-line dtor, for std::unique_ptr members.
    
      void connect();
      void disconnect();
      void stop();
    
      TcpConnectionPtr connection() const
      {
        MutexLockGuard lock(mutex_);
        return connection_;
      }
    
      EventLoop* getLoop() const { return loop_; }
      bool retry() const { return retry_; }
      void enableRetry() { retry_ = true; }
    
      const string& name() const { return name_; }
    
      /// Set connection callback.
      /// Not thread safe.
      void setConnectionCallback(ConnectionCallback cb) { connectionCallback_ = std::move(cb); }
    
      /// Set message callback.
      /// Not thread safe.
      void setMessageCallback(MessageCallback cb) { messageCallback_ = std::move(cb); }
    
      /// Set write complete callback.
      /// Not thread safe.
      void setWriteCompleteCallback(WriteCompleteCallback cb) { writeCompleteCallback_ = std::move(cb); }
    
     private:
      /// Not thread safe, but in loop
      void newConnection(int sockfd);
      /// Not thread safe, but in loop
      void removeConnection(const TcpConnectionPtr& conn);
    
      EventLoop* loop_;			// 所属的EvenetLoop
      ConnectorPtr connector_; 	// 使用Connector智能指针,避免头文件引入
      const string name_;		// 连接的名字
      
      ConnectionCallback connectionCallback_;		// 建立连接的回调函数
      MessageCallback messageCallback_;				// 消息到来的回调函数
      WriteCompleteCallback writeCompleteCallback_; // 数据发送完毕回调函数
      bool retry_;   // atomic		// 连接断开后是否重连
      bool connect_; // atomic
      // always in loop thread
      int nextConnId_;				// name_+nextConnId_ 用于标识一个连接
      mutable MutexLock mutex_;
      TcpConnectionPtr connection_ GUARDED_BY(mutex_);
    };
    

    2、TcpClient 实现

    构造TcpClient时,初始化Connector并注册事件回调,用于与服务端的通信,并设置连接成功的回调函数。

    2.1、构造函数

    初始化列表种创建一个Connector,用于连接服务器,若连接成功,调用TcpClient::newConnection()回调函数。

    TcpClient::TcpClient(EventLoop* loop,
                         const InetAddress& serverAddr,
                         const string& nameArg)
      : loop_(CHECK_NOTNULL(loop)),
        connector_(new Connector(loop, serverAddr)),	// 创建一个Connector
        name_(nameArg),
        connectionCallback_(defaultConnectionCallback),
        messageCallback_(defaultMessageCallback),
        retry_(false), 		// 默认不重连
        connect_(true), 	// 开始连接
        nextConnId_(1) 		// 当前连接的序号
    {
      // 设置建立连接的回调函数
      connector_->setNewConnectionCallback(std::bind(&TcpClient::newConnection, this, _1));
      // FIXME setConnectFailedCallback
      LOG_INFO << "TcpClient::TcpClient[" << name_ << "] - connector " << get_pointer(connector_);
    }
    

    2.2、建立连接、断开连接

    connect()函数调用Connector::start(),发起连接。

    void TcpClient::connect()
    {
      // FIXME: check state
      LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to " << connector_->serverAddress().toIpPort();
      connect_ = true;
      connector_->start();  // 调用Connector::start,发起连接
    }
    

    2.3、连接建立成功

    若连接成功,则回调TcpClient::newConnection()函数。 参数是本地已建立连接的sockfd,通过它创建一个TcpConnection,用于后续消息的发送。

    void TcpClient::newConnection(int sockfd)
    {
      loop_->assertInLoopThread();
      InetAddress peerAddr(sockets::getPeerAddr(sockfd));    // 获取对端的地址
      char buf[32];
      snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
      ++nextConnId_;
      string connName = name_ + buf;  // 连接的名字
    
      InetAddress localAddr(sockets::getLocalAddr(sockfd));  // 获取本段的地址
      // FIXME poll with zero timeout to double confirm the new connection
      // FIXME use make_shared if necessary
    
      // 构造一个TcpConnection对象,并设置相应的回调函数
      TcpConnectionPtr conn(new TcpConnection(loop_,
                                              connName,
                                              sockfd,
                                              localAddr,
                                              peerAddr));
      conn->setConnectionCallback(connectionCallback_);
      conn->setMessageCallback(messageCallback_);
      conn->setWriteCompleteCallback(writeCompleteCallback_);
      conn->setCloseCallback(std::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
      {
        MutexLockGuard lock(mutex_);
        connection_ = conn;  // 保存到成员变量
      }
      conn->connectEstablished(); // 注册到Poller,监听IO事件
    }
    

    2.4、断开连接disconnect()、关闭连接stop()

    断开连接,仅关闭写功能,仍然能接收对端消息。

    void TcpClient::disconnect()
    {
      connect_ = false;
      {
        MutexLockGuard lock(mutex_);
        if (connection_) {
          connection_->shutdown();  // 半关闭, 能继续完整接收对端的消息
        }
      }
    }
    

    关闭连接,则将完全关闭客户端,不能再进行收、发数据。

    void TcpClient::stop()
    {
      connect_ = false;
      connector_->stop();
    }
    

    2.5、析构

    TcpClient::~TcpClient()
    {
      LOG_INFO << "TcpClient::~TcpClient[" << name_ << "] - connector " << get_pointer(connector_);
      TcpConnectionPtr conn;
      bool unique = false;
      {
        MutexLockGuard lock(mutex_);
        unique = connection_.unique();  // 是否只有一个持有者
        conn = connection_;
      }
      if (conn)  // 连接已经建立成功,TcpConnectionPtr  不为空
      {
        assert(loop_ == conn->getLoop());
        // FIXME: not 100% safe, if we are in different thread
        CloseCallback cb = std::bind(&detail::removeConnection, loop_, _1);
        loop_->runInLoop(std::bind(&TcpConnection::setCloseCallback, conn, cb));
        if (unique){
          conn->forceClose();
        }
      }
      else{  // TcpConnectionPtr为空
        connector_->stop();   // 关闭Connector连接
        // FIXME: HACK
        loop_->runAfter(1, std::bind(&detail::removeConnector, connector_)); 
      }
    }
    

    3、测试

    EchoClient

    #include <muduo/net/TcpClient.h>
    
    #include <muduo/base/Logging.h>
    #include <muduo/base/Thread.h>
    #include <muduo/net/EventLoop.h>
    #include <muduo/net/InetAddress.h>
    
    #include <utility>
    
    #include <stdio.h>
    #include <unistd.h>
    
    using namespace muduo;