当前位置 博文首页 > wanggao的专栏:muduo学习笔记:net部分之实现TCP网络编程库-Acc
前述文章围绕base、net两个模块各种组件,已经形成了初具规模的Reactor事件处理框架。从现在开始,逐步实现一个非阻塞的TCP网络编程库。不同于传统的Reactor,将timers 做成循环中单独的一步,muduo将 timers 和 IO handlers 视为等同的。
整个TCP网络库,分为服务端和客户端。服务端TcpServer
使用reactor模式进行IO事件循环,使用Acceptor接收新的连接,每个连接都是一个TcpConnetion
对象。客户端TcpClient
使用Connector
发起连接,通过TcpConection
收发数据。客户端主动发起连接比服务端被动接收连接需要额外的错误处理、考虑重连,因此先介绍服务端的实现,本文先以Acceptor
为开端。
Acceptor
class用于accept(2)新TCP连接,并通过回调通知使用者,它是内部class,在上层应用程序中我们不直接使用,而是把它封装作为TcpServer
的成员,生命期由后者控制。
class Acceptor : noncopyable
{
public:
typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;
Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);
~Acceptor();
void setNewConnectionCallback(const NewConnectionCallback& cb)
{ newConnectionCallback_ = cb; } // 设置新连接建立的回调函数
bool listenning() const { return listenning_; } // 返回当前监听状态
void listen(); // 开启监听
private:
void handleRead();
EventLoop* loop_; // 基本是main reactor
Socket acceptSocket_; // 用于接收新连接的scoket封装
Channel acceptChannel_; // 封装acceptSocket_的channel,监听其上的事件
NewConnectionCallback newConnectionCallback_; // 建立新连接时调用的回调函数
bool listenning_; // 是否在监听的
int idleFd_; // 空闲的描述符
};
定义比较简单,直接看实现。
(1)构造和析构
构造时,需要传递当前Acceptor
所属的EventLoop
,一般是是主线程或者main reactor中。创建一个listening socket,绑定指定ip和port,并构造一个channel用于监听此的连接。为处理无可用描述符时的连接请求,使用一个预留fd处理。
析构时,关闭channel关注的IO事件,从Poller注销,关闭socket对象。
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
acceptChannel_(loop, acceptSocket_.fd()),
listenning_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
assert(idleFd_ >= 0);
// 设置服务端socket选项,并绑定到指定ip和port
acceptSocket_.setReuseAddr(true); // addr重用
acceptSocket_.setReusePort(reuseport); // 端口重用
acceptSocket_.bindAddress(listenAddr); // bing
// 使用channel监听socket上的可读事件(新的连接)
acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}
Acceptor::~Acceptor()
{
// 不关注socket上的IO事件,从EventLoop的Poller注销
acceptChannel_.disableAll();
acceptChannel_.remove();
// 关闭socket
::close(idleFd_);
}
(2)开始监听新连接
将状态设置为监听,然后调用监听socket的listen函数,将监听channel的读事件注册到poller的管理。
void Acceptor::listen()
{
loop_->assertInLoopThread();
listenning_ = true;
acceptSocket_.listen();
//
acceptChannel_.enableReading();
}
(3)处理新连接
当有客户端发起连接时,监听channel触发读事件,那么调用其对应的回调函数Acceptor::handleRead()
。
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr); // 这里是真正接收连接
if (connfd >= 0) //新的连接成功
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
if (newConnectionCallback_){
// 建立新的连接,调用TcpServer的回调,返回已连接的socketfd和peer端地址
newConnectionCallback_(connfd, peerAddr);
}
else{
// 若上层应用TcpServer未注册新连接回调函数,则直接关闭当前连接
sockets::close(connfd);
}
}
else // 连接异常,处理服务端fd耗尽
{
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of
// accept()ing when you can't" in libev's doc.
// By Marc Lehmann, author of libev.
if (errno == EMFILE) // 无可用fd,如不处理,否则Poller水平触发模式下会一直触发
{
::close(idleFd_); // 关闭空闲的fd,空出一个可用的fd
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); // 把前面调用acceptor没有接受的描述符接受进来到idleFd_
::close(idleFd_); // 把这个idleFd_ 关闭,就是关闭了当前此次连接
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); // 重新开启这个空闲描述符
}
}
}
当前文件描述符过多,无法接收新的连接。但是由于Poller采用LT模式,如果无法接收,可读事件会一直触发。那么在这个地方的处理机制就是,关掉之前创建的空闲的idleFd_,然后去accept让这个事件不会一直触发,然后再关掉该文件描述符,重新将它设置为空文件描述符。这种机制可以让网络库在处理连接过多、文件描述符不够用时,不至于因为LT模式一直触发而产生坏的影响。
cs