当前位置 博文首页 > changke的博客:I/O多路复用深度解析

    changke的博客:I/O多路复用深度解析

    作者:[db:作者] 时间:2021-07-31 12:24

    这两天刷算法,感觉自己凉了,不看人家的答案自己连题都看不懂。天赋吧,这没得救了。于是决定先把算法放一下,找点比较有意思的事情做做,所以就突发奇想不如看一下epoll源码吧,整天用这个东西,了解一下他实现有益无害吧!


    在网络编程中,有常用的IO复用有三个,对于不同的场景,各显神通。即select,poll,epoll,当然select和poll不是该篇文章的主角,下面我主要的是epoll的详细实现原理。当然select和poll我将作对比性的描述。

    下面我们思考一个问题:

    • 为什么存在I/O复用,使用IO复用和不使用在单进程中有什么影响?

    这个问题在网上没找到答案,可能这个“为什么”提出来并没有什么意义。其实答案比较简单,多路复用是在单个线程中,实现跟踪多socket套接字状态变化的一种机制。在不使用IO复用的情况下,没法实现单线程检测多个套接字状态变化。(以上是个人见解,仅供参考)

    我们都使用select,poll,epoll这些系统调用来实现IO复用机制。对于前两者,每次只要有就绪事件,要将所有fd传递给select或poll系统调用,这意味着将所有的fd从用户态拷贝到内核态。这就多余了一些没有意义的拷贝,因为有些描述符上还没有IO事件的发生,只因为其他的描述符有IO事件就被拷贝到内核态。

    再一个有必要区分一下select和poll,select系统调用通过一个位数组fd_bits收集要监听的描述符,内核中的宏定义限制了最大可监听的描述符数量为1024个。

    而对于poll没有限制监听的数量,只要来一个新连接,就将这个连接加到poll维护的描述符链表中。

    对于以上两种IO复用机制都没有解决避免复制非就绪连接到内核的无效操作。对于连接较少的状况,以上三种IO复用机制效率相当,但要是连接数量上升,程序中分别使用三者的性能差异就显而易见了。

    epoll为什么高效快捷?

    epoll相关的系统调用有:epoll_create, epoll_ctl和epoll_wait。

    在Linux系统中,epoll想内核注册了一个epoll文件系统,用来存储被监控的文件描述符。

    当epoll_create时,就会在epoll文件系统创建file节点,只服务于epoll。当epoll在内核中被初始化时,操作系统回为epoll开辟出一个内核高速缓冲区(连续的物理内存页),用来保存保存红黑树结构

    static int __init eventpoll_init(void)
    {
        mutex_init(&pmutex);
        ep_poll_safewake_init(&psw);
        //开辟slab高速缓冲区
        epi_cache = kmem_cache_create(“eventpoll_epi”,sizeof(struct epitem),
                0,SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|
                SLAB_PANIC,NULL);
        pwq_cache = kmem_cache_create(“eventpoll_pwq”,sizeof(struct
                 eppoll_entry),0,EPI_SLAB_DEBUG|SLAB_PANIC,NULL);
        
        return 0;
    }
    

    在刚开始学习epoll的时候就听过epoll底层的实现核心数据结构就是红黑树,红黑树是一种自平衡二叉树,在查找和删除效率较高。

    在调用epoll_create后,epoll底层实现会在epoll的高速缓冲区中创建一个红黑树用来接收后面到来的连接,再者就是会创建一个就绪链表,保存发生就绪事件的fd:

    //当来一个新连接,基于slab高速缓存(可以理解为一种分配内存的机制,对于频繁的使用和销毁特别的高效),创建一个epitem对象,保存下面描述的信息,然后将这个对象加到红黑树中。
    
    struct epitem {
    
        struct rb_node  rbn;        //用于主结构管理的红黑树
    
        struct list_head  rdllink;  //事件就绪队列
    
        struct epitem  *next;       //用于主结构体中的链表
    
     struct epoll_filefd  ffd;   //这个结构体对应的被监听的文件描述符信息
    
     int  nwait;                 //poll操作中事件的个数
    
        struct list_head  pwqlist;  //双向链表,保存着被监视文件的等待队列,功能类似于select/poll中的poll_table
    
        struct eventpoll  *ep;      //该项属于哪个主结构体(多个epitm从属于一个eventpoll)
    
        struct list_head  fllink;   //双向链表,用来链接被监视的文件描述符对应的struct file。因为file里有f_ep_link,用来保存所有监视这个文件的epoll节点
    
        struct epoll_event  event;  //注册的感兴趣的事件,也就是用户空间的epoll_event
    }
    
    

    在这里插入图片描述

    
    //每个epoll fd(epfd)对应的主要数据结构为:
    
    struct eventpoll {
    
      spin_lock_t       lock;        //对本数据结构的访问
    
      struct mutex      mtx;         //防止使用时被删除
    
      wait_queue_head_t     wq;      //sys_epoll_wait() 使用的等待队列
    
      wait_queue_head_t   poll_wait;       //file->poll()使用的等待队列
    
      struct list_head    rdllist;        //事件满足条件的链表
    
      struct rb_root      rbr;            //用于管理所有fd的红黑树(树根)
    
      struct epitem      *ovflist;       //将事件到达的fd进行链接起来发送至用户空间
    
    }
    
    //内核通过创建一个fd与这个结构相关联,管理红黑树结构,即epoll_create1实现
    
      SYSCALL_DEFINE1(epoll_create1, int, flags)
      {
      	int error, fd;
      	//
      	struct eventpoll *ep = NULL;
      	struct file *file;
       
      	/* Check the EPOLL_* constant for consistency.  */
      	BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
       
      	if (flags & ~EPOLL_CLOEXEC)
      		return -EINVAL;
      	/*
      	 * Create the internal data structure ("struct eventpoll").
      	 */
      	error = ep_alloc(&ep);
      	if (error < 0)
      		return error;
      	/*
      	 * Creates all the items needed to setup an eventpoll file. That is,
      	 * a file structure and a free file descriptor.
      	 */
      	 
      	fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
      	if (fd < 0) {
      		error = fd;
      		goto out_free_ep;
      	}
      	file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
      				 O_RDWR | (flags & O_CLOEXEC));
      	if (IS_ERR(file)) {
      		error = PTR_ERR(file);
      		goto out_free_fd;
      	}
      	
      	ep->file = file;
      	fd_install(fd, file);
      	return fd;
       
      out_free_fd:
      	put_unused_fd(fd);
      out_free_ep:
      	ep_free(ep);
      	return error;
      }
       //创建并初始化eventpoll结构体实例,由ep_alloc实现(第52行)可知,调用kzalloc在内核空间申请内存并初始化清零。
      static int ep_alloc(struct eventpoll **pep)
      {
      	int error;
      	struct user_struct *user;
      	struct eventpoll *ep;
       
      	user = get_current_user();
      	error = -ENOMEM;
      	ep = kzalloc(sizeof(*ep), GFP_KERNEL);
      	if (unlikely(!ep))
      		goto free_uid;
       
      	spin_lock_init(&ep->lock);
      	mutex_init(&ep->mtx);
      	init_waitqueue_head(&ep->wq);
      	init_waitqueue_head(&ep->poll_wait);
      	INIT_LIST_HEAD(&ep->rdllist);
      	ep->rbr = RB_ROOT;
      	ep->ovflist = EP_UNACTIVE_PTR;
      	ep->user = user;
       
      	*pep = ep;
      	return 0;
      free_uid:
      	free_uid(user);
      	return error;
      }
    

    当红黑树中有IO事件时,会通过红黑树的删除操作将节点取下放到就绪事件链表rdlist中(也就是将就绪事件从红黑树中取出放到就绪链表中),然后通过epoll_wait返回到用户进程,在进程中进行逻辑处理或者又进行IO处理,所以对于epoll来说只需关心就绪链表中的描述符。不必像poll和select一样只要有个别的fd发生IO活动就遍历整个集合或者链表。

    • 还有就是网上所说的epoll还使用了mmap,实现用户空间和内核空间的共享?

    通过前面的例子发现这一说法是站不住脚的,通过源码我们可以看出epoll高速缓存是存在的,他就是用来保存红黑树结构的,来一个新连接,加到红黑树中进行检测,这些已经在逻辑上很合理了,如果存在用户空间和内核空间共享内存的话,那岂不是画蛇添足?所以不论是epoll fd本身或者epitem都是基于内核内存分配创建,不存在mmap之说。

    下面代码主要是往红黑树中添加节点EPOLL_CTL_ADD,还有就是取出就绪事件的过程:

       static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd)
    
    {
    
       int error ,revents,pwake = 0;
    
       unsigned long flags ;
    
       struct epitem *epi;
    
       /*
    
          struct ep_queue{
    
             poll_table pt;
    
             struct epitem *epi;
    
          }   */
    
     
    
       struct ep_pqueue epq;
    
     
    
       //分配一个epitem结构体来保存每个加入的fd
    
       if(!(epi = kmem_cache_alloc(epi_cache,GFP_KERNEL)))
    
          goto error_return;
    
       //初始化该结构体
    
       ep_rb_initnode(&epi->rbn);
    
       INIT_LIST_HEAD(&epi->rdllink);
    
       INIT_LIST_HEAD(&epi->fllink);
    
       INIT_LIST_HEAD(&epi->pwqlist);
    
       epi->ep = ep;
    
       ep_set_ffd(&epi->ffd,tfile,fd);
    
       epi->event = *event;
    
       epi->nwait = 0;
    
       epi->next = EP_UNACTIVE_PTR;
    
     
    
       epq.epi = epi;
    
       //安装poll回调函数
    
       init_poll_funcptr(&epq.pt, ep_ptable_queue_proc );
    
       /* 调用poll函数来获取当前事件位,其实是利用它来调用注册函数ep_ptable_queue_proc(poll_wait中调用)。
    
           如果fd是套接字,f_op为socket_file_ops,poll函数是
    
           sock_poll()。如果是TCP套接字的话,进而会调用
    
           到tcp_poll()函数。此处调用poll函数查看当前
    
           文件描述符的状态,存储在revents中。
    
           在poll的处理函数(tcp_poll())中,会调用sock_poll_wait(),
    
           在sock_poll_wait()中会调用到epq.pt.qproc指向的函数,
    
           也就是ep_ptable_queue_proc()。  */ 
    
     
    
       revents = tfile->f_op->poll(tfile, &epq.pt);
    
     
    
       spin_lock(&tfile->f_ep_lock);
    
       list_add_tail(&epi->fllink,&tfile->f_ep_lilnks);
    
       spin_unlock(&tfile->f_ep_lock);
    
     
    
       ep_rbtree_insert(ep,epi); //将该epi插入到ep的红黑树中
    
     
    
       spin_lock_irqsave(&ep->lock,flags);
    
     
    
    //  revents & event->events:刚才fop->poll的返回值中标识的事件有用户event关心的事件发生。
    
    // !ep_is_linked(&epi->rdllink):epi的ready队列中有数据。ep_is_linked用于判断队列是否为空。
    
    /*  如果要监视的文件状态已经就绪并且还没有加入到就绪队列中,则将当前的
    
        epitem加入到就绪队列中.如果有进程正在等待该文件的状态就绪,则
    
        唤醒一个等待的进程。  */ 
    
     
    
    if((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
    
          list_add_tail(&epi->rdllink,&ep->rdllist); //将当前epi插入到ep->ready队列中。
    
    /* 如果有进程正在等待文件的状态就绪,
    
    也就是调用epoll_wait睡眠的进程正在等待,
    
    则唤醒一个等待进程。
    
    waitqueue_active(q) 等待队列q中有等待的进程返回1,否则返回0。
    
    */
    
     
    
          if(waitqueue_active(&ep->wq))
    
             __wake_up_locked(&ep->wq,TAKS_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);
    
     
    
    /*  如果有进程等待eventpoll文件本身(???)的事件就绪,
    
               则增加临时变量pwake的值,pwake的值不为0时,
    
               在释放lock后,会唤醒等待进程。 */ 
    
     
    
          if(waitqueue_active(&ep->poll_wait))
    
             pwake++;
    
       }