当前位置 博文首页 > wanggao的专栏:muduo学习笔记:net部分之实现TCP网络编程库-Con
类比于TcpServer被动接受连接使用Acceptor,TcpClient主动发起连接使用Connector。
Connector使用的几个要点:
Connector 在TcpClient中创建,传递所属的EventLoop、服务端的地址。暴露给外部使用接口只有5个。大部分connect的操作都在loop中执行。
class Connector : noncopyable,
public std::enable_shared_from_this<Connector>
{
public:
typedef std::function<void (int sockfd)> NewConnectionCallback; // 连接成回调函数原型
Connector(EventLoop* loop, const InetAddress& serverAddr); // 构造函数
~Connector();
void setNewConnectionCallback(const NewConnectionCallback& cb)
{ newConnectionCallback_ = cb; }
void start(); // 安全,实际调用EventLoop::queueInLoop()
void restart(); // 不安全,只能在lopp thread调用
void stop(); // 安全,实际调用EventLoop::queueInLoop()
const InetAddress& serverAddress() const { return serverAddr_; }
private:
enum States { kDisconnected, kConnecting, kConnected };
static const int kMaxRetryDelayMs = 30*1000; // 重连的最大延迟时间
static const int kInitRetryDelayMs = 500; // 初始重连的延迟时间
void setState(States s) { state_ = s; }
void startInLoop(); // start()的queueInLoop实际调用
void stopInLoop(); // stop()的queueInLoop实际调用
void connect(); // 创建sockefd,调用::connetc(2)系统调用
void connecting(int sockfd); // ::connetc(2)连接成,构造Channel,注册事件
void handleWrite(); // 处理已连接的socketfd可读事件
void handleError(); // 处理已连接的socketfd错误事件
void retry(int sockfd); // 尝试重连
int removeAndResetChannel(); // 断开连接,重置Channel
void resetChannel();
EventLoop* loop_; // 所属的事件循环loop
InetAddress serverAddr_; // 要连接的服务端地址
bool connect_; // atomic // 是否要连接的标志
States state_; // FIXME: use atomic variable
std::unique_ptr<Channel> channel_; // 客户端用于通信的socketfd创建的channel
NewConnectionCallback newConnectionCallback_; // 建立成功时的回调函数
int retryDelayMs_; // 重连的延迟时间ms
};
构造初始化列表对成员变量赋值。析构函数不做任何处理,关闭连接使用特定函数。
Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
: loop_(loop), // 所属EvenetLoop
serverAddr_(serverAddr), // 要连接的服务端地址
connect_(false), // 现在不去建立连接
state_(kDisconnected), // 初始连接状态
retryDelayMs_(kInitRetryDelayMs) // 重连延迟时间
{
LOG_DEBUG << "ctor[" << this << "]";
}
Connector::~Connector()
{
LOG_DEBUG << "dtor[" << this << "]";
assert(!channel_);
}
Connector的持有者TcpClient调用 start(),开始连接,内部调用loop_->runInLoop,正常情况调用connect()函数,分步骤如下:
注意,仅在设置connect_为true时,才发起连接处理。如果调用stop()断开连接后,这里的代码将不处理连接。
void Connector::start()
{
connect_ = true;
loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}
void Connector::startInLoop()
{
loop_->assertInLoopThread();
assert(state_ == kDisconnected);
if (connect_){ // 仅处理 开始连接标志位true的情况
connect();
}
else{
LOG_DEBUG << "do not connect";
}
}
void Connector::connect()
{
int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
int savedErrno = (ret == 0) ? 0 : errno;
switch (savedErrno)
{
case 0:
case EINPROGRESS: // 非阻塞不能立即返回,正在连接中,需后续socketfd可读检查SO_ERROR
case EINTR: // 连接被SIGNAL中断
case EISCONN: // 已连接状态
connecting(sockfd);
break;
case EAGAIN: // 本地地址处于使用状态
case EADDRINUSE: // 地址已被使用
case EADDRNOTAVAIL: // 无可用的本地端口用于连接
case ECONNREFUSED: // 服务端地址上没有socket监听
case ENETUNREACH: // 网络不可达(防火墙?)
retry(sockfd);
break;
case EACCES:
case EPERM: // 连接到广地地址但未设置该标志位;本地防火墙规则不允许连接
case EAFNOSUPPORT: // 不支持当前sa_family
case EALREADY: // socket是非阻塞的,但前一个连接请求尚未处理结束
case EBADF: // sockfd参数在描述符表中无效
case EFAULT: // socket数据结构不在用户地址空间
case ENOTSOCK: // fd不是sokcet类型
LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
break;
default:
LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
// connectErrorCallback_();
break;
}
}
开始连接中的处理过程,设置当前连接状态kConnecting,将当前socketfd构造成Channel,并注册可写、错误事件到Poller。通过Poller监听的可读事件进一步判断连接状态。
void Connector::connecting(int sockfd)
{
setState(kConnecting); // 设置状态
assert(!channel_);
// 重设channel_,设置可写事件回调、错误事件回调
channel_.reset(new Channel(loop_, sockfd));
channel_->setWriteCallback(std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
channel_->setErrorCallback(std::bind(&Connector::handleError, this)); // FIXME: unsafe
// channel_->tie(shared_from_this()); is not working,
// as channel_ is not managed by shared_ptr
channel_->enableWriting(); // 注册到Poller
}
监听到socketfd的可读事件,确认当前连接是否成功建立,如果当前状态是kDisconnected,则不进行处理。
如果当前状态是kConnecting连接中的状态,重置channel_,并从socketfd获取SO_ERROR。如果SO_ERROR值不为零有错误,或者是自连接,则尝试retry(sockfd)重连;SO_ERROR值为0表示连接成功,正常进行回调传递已连接socketfd给TcpClient。在处理错误事件中处理状态是kConnecting的情况,重置channel_并尝试重连。
注意,调用removeAndResetChannel()函数返回socketfd,是因为当前socketfd是已经连接上服务端的,后续TcpClient需要接管socketfd并创建TcpConnection,并将socketd的IO事件注册到Poller,进行服务端-客户端之间的通信,因此在连接上的期间需要当前Connector不再管理该描述符。
void Connector::handleWrite()
{
LOG_TRACE << "Connector::handleWrite " << state_;
if (state_ == kConnecting){ // 当前是kConnecting连接中的状态
int sockfd = removeAndResetChannel(); // 重置连接,并返回上一次连接使用的soketfd
int err = sockets::getSocketError(sockfd); // 获取上一次错误
if (err){ // 连接失败
LOG_WARN << "Connector::handleWrite - SO_ERROR = " << err << " " << strerror_tl(err);
retry(sockfd);
}
else if (sockets::isSelfConnect(sockfd)){ // 自连接
LOG_WARN << "Connector::handleWrite - Self connect";
retry(sockfd);
}
else{ // 错误值0, 连接成功
setState(kConnected); // 设置为kConnected已连接状态
if (connect_){
newConnectionCallback_(sockfd); // 回调TcpClient的连接成功回调,返回已连接socketfd
}
else{
sockets::close(sockfd); // 若不进行连接,则关闭当前已连接socketfd