当前位置 博文首页 > changke的博客:I/O多路复用深度解析
这两天刷算法,感觉自己凉了,不看人家的答案自己连题都看不懂。天赋吧,这没得救了。于是决定先把算法放一下,找点比较有意思的事情做做,所以就突发奇想不如看一下epoll源码吧,整天用这个东西,了解一下他实现有益无害吧!
下面我们思考一个问题:
这个问题在网上没找到答案,可能这个“为什么”提出来并没有什么意义。其实答案比较简单,多路复用是在单个线程中,实现跟踪多socket套接字状态变化的一种机制。在不使用IO复用的情况下,没法实现单线程检测多个套接字状态变化。(以上是个人见解,仅供参考)
我们都使用select,poll,epoll这些系统调用来实现IO复用机制。对于前两者,每次只要有就绪事件,要将所有fd传递给select或poll系统调用,这意味着将所有的fd从用户态拷贝到内核态。这就多余了一些没有意义的拷贝,因为有些描述符上还没有IO事件的发生,只因为其他的描述符有IO事件就被拷贝到内核态。
再一个有必要区分一下select和poll,select系统调用通过一个位数组fd_bits收集要监听的描述符,内核中的宏定义限制了最大可监听的描述符数量为1024个。
而对于poll没有限制监听的数量,只要来一个新连接,就将这个连接加到poll维护的描述符链表中。
对于以上两种IO复用机制都没有解决避免复制非就绪连接到内核的无效操作。对于连接较少的状况,以上三种IO复用机制效率相当,但要是连接数量上升,程序中分别使用三者的性能差异就显而易见了。
epoll为什么高效快捷?
epoll相关的系统调用有:epoll_create, epoll_ctl和epoll_wait。
在Linux系统中,epoll想内核注册了一个epoll文件系统,用来存储被监控的文件描述符。
当epoll_create时,就会在epoll文件系统创建file节点,只服务于epoll。当epoll在内核中被初始化时,操作系统回为epoll开辟出一个内核高速缓冲区(连续的物理内存页
),用来保存保存红黑树结构
。
static int __init eventpoll_init(void)
{
mutex_init(&pmutex);
ep_poll_safewake_init(&psw);
//开辟slab高速缓冲区
epi_cache = kmem_cache_create(“eventpoll_epi”,sizeof(struct epitem),
0,SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|
SLAB_PANIC,NULL);
pwq_cache = kmem_cache_create(“eventpoll_pwq”,sizeof(struct
eppoll_entry),0,EPI_SLAB_DEBUG|SLAB_PANIC,NULL);
return 0;
}
在刚开始学习epoll的时候就听过epoll底层的实现核心数据结构就是红黑树,红黑树是一种自平衡二叉树,在查找和删除效率较高。
在调用epoll_create后,epoll底层实现会在epoll的高速缓冲区中创建一个红黑树用来接收后面到来的连接,再者就是会创建一个就绪链表,保存发生就绪事件的fd:
//当来一个新连接,基于slab高速缓存(可以理解为一种分配内存的机制,对于频繁的使用和销毁特别的高效),创建一个epitem对象,保存下面描述的信息,然后将这个对象加到红黑树中。
struct epitem {
struct rb_node rbn; //用于主结构管理的红黑树
struct list_head rdllink; //事件就绪队列
struct epitem *next; //用于主结构体中的链表
struct epoll_filefd ffd; //这个结构体对应的被监听的文件描述符信息
int nwait; //poll操作中事件的个数
struct list_head pwqlist; //双向链表,保存着被监视文件的等待队列,功能类似于select/poll中的poll_table
struct eventpoll *ep; //该项属于哪个主结构体(多个epitm从属于一个eventpoll)
struct list_head fllink; //双向链表,用来链接被监视的文件描述符对应的struct file。因为file里有f_ep_link,用来保存所有监视这个文件的epoll节点
struct epoll_event event; //注册的感兴趣的事件,也就是用户空间的epoll_event
}
//每个epoll fd(epfd)对应的主要数据结构为:
struct eventpoll {
spin_lock_t lock; //对本数据结构的访问
struct mutex mtx; //防止使用时被删除
wait_queue_head_t wq; //sys_epoll_wait() 使用的等待队列
wait_queue_head_t poll_wait; //file->poll()使用的等待队列
struct list_head rdllist; //事件满足条件的链表
struct rb_root rbr; //用于管理所有fd的红黑树(树根)
struct epitem *ovflist; //将事件到达的fd进行链接起来发送至用户空间
}
//内核通过创建一个fd与这个结构相关联,管理红黑树结构,即epoll_create1实现
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
//
struct eventpoll *ep = NULL;
struct file *file;
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
ep->file = file;
fd_install(fd, file);
return fd;
out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}
//创建并初始化eventpoll结构体实例,由ep_alloc实现(第52行)可知,调用kzalloc在内核空间申请内存并初始化清零。
static int ep_alloc(struct eventpoll **pep)
{
int error;
struct user_struct *user;
struct eventpoll *ep;
user = get_current_user();
error = -ENOMEM;
ep = kzalloc(sizeof(*ep), GFP_KERNEL);
if (unlikely(!ep))
goto free_uid;
spin_lock_init(&ep->lock);
mutex_init(&ep->mtx);
init_waitqueue_head(&ep->wq);
init_waitqueue_head(&ep->poll_wait);
INIT_LIST_HEAD(&ep->rdllist);
ep->rbr = RB_ROOT;
ep->ovflist = EP_UNACTIVE_PTR;
ep->user = user;
*pep = ep;
return 0;
free_uid:
free_uid(user);
return error;
}
当红黑树中有IO事件时,会通过红黑树的删除操作将节点取下放到就绪事件链表rdlist中(也就是将就绪事件从红黑树中取出放到就绪链表中),然后通过epoll_wait返回到用户进程,在进程中进行逻辑处理或者又进行IO处理,所以对于epoll来说只需关心就绪链表中的描述符。不必像poll和select一样只要有个别的fd发生IO活动就遍历整个集合或者链表。
通过前面的例子发现这一说法是站不住脚的,通过源码我们可以看出epoll高速缓存是存在的,他就是用来保存红黑树结构的,来一个新连接,加到红黑树中进行检测,这些已经在逻辑上很合理了,如果存在用户空间和内核空间共享内存的话,那岂不是画蛇添足?所以不论是epoll fd本身或者epitem都是基于内核内存分配创建,不存在mmap之说。
下面代码主要是往红黑树中添加节点EPOLL_CTL_ADD,还有就是取出就绪事件的过程:
static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd)
{
int error ,revents,pwake = 0;
unsigned long flags ;
struct epitem *epi;
/*
struct ep_queue{
poll_table pt;
struct epitem *epi;
} */
struct ep_pqueue epq;
//分配一个epitem结构体来保存每个加入的fd
if(!(epi = kmem_cache_alloc(epi_cache,GFP_KERNEL)))
goto error_return;
//初始化该结构体
ep_rb_initnode(&epi->rbn);
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd,tfile,fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
epq.epi = epi;
//安装poll回调函数
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc );
/* 调用poll函数来获取当前事件位,其实是利用它来调用注册函数ep_ptable_queue_proc(poll_wait中调用)。
如果fd是套接字,f_op为socket_file_ops,poll函数是
sock_poll()。如果是TCP套接字的话,进而会调用
到tcp_poll()函数。此处调用poll函数查看当前
文件描述符的状态,存储在revents中。
在poll的处理函数(tcp_poll())中,会调用sock_poll_wait(),
在sock_poll_wait()中会调用到epq.pt.qproc指向的函数,
也就是ep_ptable_queue_proc()。 */
revents = tfile->f_op->poll(tfile, &epq.pt);
spin_lock(&tfile->f_ep_lock);
list_add_tail(&epi->fllink,&tfile->f_ep_lilnks);
spin_unlock(&tfile->f_ep_lock);
ep_rbtree_insert(ep,epi); //将该epi插入到ep的红黑树中
spin_lock_irqsave(&ep->lock,flags);
// revents & event->events:刚才fop->poll的返回值中标识的事件有用户event关心的事件发生。
// !ep_is_linked(&epi->rdllink):epi的ready队列中有数据。ep_is_linked用于判断队列是否为空。
/* 如果要监视的文件状态已经就绪并且还没有加入到就绪队列中,则将当前的
epitem加入到就绪队列中.如果有进程正在等待该文件的状态就绪,则
唤醒一个等待的进程。 */
if((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink,&ep->rdllist); //将当前epi插入到ep->ready队列中。
/* 如果有进程正在等待文件的状态就绪,
也就是调用epoll_wait睡眠的进程正在等待,
则唤醒一个等待进程。
waitqueue_active(q) 等待队列q中有等待的进程返回1,否则返回0。
*/
if(waitqueue_active(&ep->wq))
__wake_up_locked(&ep->wq,TAKS_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);
/* 如果有进程等待eventpoll文件本身(???)的事件就绪,
则增加临时变量pwake的值,pwake的值不为0时,
在释放lock后,会唤醒等待进程。 */
if(waitqueue_active(&ep->poll_wait))
pwake++;
}