select/poll 源码分析

select/poll模型一直是Linux长期以来使用较多的服务器模型,自从2.6内核引入epoll模型以后,我们又有了一种更加有效的服务器模型,一直以来都耳闻epoll模型对比select模型的优点,赞誉之声一片,他如何高效,高效到什么程度,还是一知半解,在之前的一篇博客,我简要分析了select模型代码,的确正如前人所言,使用轮询方式处理连接,这里我讲分析epoll模型的框架,体会其高效。

这里先简略说明一下epoll的使用流程,其使用或许比select模型更为简洁:

epoll_create(size);

while (...)
{
	/* 等待就绪连接 */
	epoll_wait( ... );

	/* 如有新连接,构造epoll_event结构体后 */
	epoll_ctl( ... EPOLL_CTL_ADD ... );
	/* 如有断开连接 */
	epoll_ctl( ... EPOLL_CTL_DEL ... );
}

从使用角度出发,先来看epoll_create函数的实现,用户态声明int epoll_create(int size);对应的系统调用代码:

:::
SYSCALL_DEFINE1(epoll_create, int, size)
{
	if (size

可以看到,sys_epoll_create仅仅检测size是否小于0,之后直接调用sys_epoll_create1,也就是说size在2.6.34版本中是没有使用的,通过查阅代码,发现sys_epoll_create1是在2.6.27版本中加入的,但是之前的几个版本也仅仅只是输出日志时使用,没有回溯查看真正废弃改值是在哪个版本,总之真正的实现是位于sys_epoll_create1

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
	int error;
	struct eventpoll *ep = NULL;

	/* Check the EPOLL_* constant for consistency.  */
	BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

	if (flags & ~EPOLL_CLOEXEC)
		return -EINVAL;
	/*
	 * Create the internal data structure ("struct eventpoll").
	 */
	error = ep_alloc(&ep);
	if (error < 0)
		return error;
	/*
	 * Creates all the items needed to setup an eventpoll file. That is,
	 * a file structure and a free file descriptor.
	 */
	error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
				 O_RDWR | (flags & O_CLOEXEC));
	if (error < 0)
		ep_free(ep);

	return error;
}

代码结构比较清晰,调用ep_alloc分配一个eventpoll结构,调用anon_inode_getfd创建一个文件节点和文件描述符,并返回文件描述符,这个文件描述符供epoll自己使用。

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)
{
	int error;
	struct file *file, *tfile;
	struct eventpoll *ep;
	struct epitem *epi;
	struct epoll_event epds;

	...
	/* 从file结构私有数据取出ep_alloc分配的结构 */
	ep = file->private_data;

	mutex_lock(&ep->mtx);

	/*
	 * Try to lookup the file inside our RB tree, Since we grabbed "mtx"
	 * above, we can be sure to be able to use the item looked up by
	 * ep_find() till we release the mutex.
	 */
	epi = ep_find(ep, tfile, fd);

	error = -EINVAL;
	switch (op) {
	case EPOLL_CTL_ADD:
		if (!epi) {
			epds.events |= POLLERR | POLLHUP;
			error = ep_insert(ep, &epds, tfile, fd);
		} else
			error = -EEXIST;
		break;
	case EPOLL_CTL_DEL:
		if (epi)
			error = ep_remove(ep, epi);
		else
			error = -ENOENT;
		break;
	case EPOLL_CTL_MOD:
		if (epi) {
			epds.events |= POLLERR | POLLHUP;
			error = ep_modify(ep, epi, &epds);
		} else
			error = -ENOENT;
		break;
	}
	mutex_unlock(&ep->mtx);

error_tgt_fput:
	fput(tfile);
error_fput:
	fput(file);
error_return:

	return error;
}

去除错误检测,剩下的代码也比较清晰,首先取出epoll_create1分配的eventpoll结构ep,然后使用ep_find在ep中查找当前操作的文件描述符,接下来有个判断,分不同操作进行,如果是EPOLL_CTL_ADD,则调ep_insert插入文件描述符,如果是EPOLL_CTL_DEL则调用ep_remove删除文件描述符,修改则用ep_modify

姑且先忽略ep操作的函数,直接去看epoll_wait的实现,以明确流程。 epoll_wait实现与1321行,不过其中所作操作非常简单:从file结构中取出eventpoll结构,直接调用ep_poll,我们需要着重分析ep_poll

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
		   int maxevents, long timeout)
{
	int res, eavail;
	unsigned long flags;
	long jtimeout;
	wait_queue_t wait;

	...

	res = 0;
	if (list_empty(&ep->rdllist)) {
		/*
		 * We don't have any available event to return to the caller.
		 * We need to sleep here, and we will be wake up by
		 * ep_poll_callback() when events will become available.
		 */
		init_waitqueue_entry(&wait, current);
		wait.flags |= WQ_FLAG_EXCLUSIVE;
		__add_wait_queue(&ep->wq, &wait);

		for (;;) {
			/*
			 * We don't want to sleep if the ep_poll_callback() sends us
			 * a wakeup in between. That's why we set the task state
			 * to TASK_INTERRUPTIBLE before doing the checks.
			 */
			set_current_state(TASK_INTERRUPTIBLE);
			if (!list_empty(&ep->rdllist) || !jtimeout)
				break;
			if (signal_pending(current)) {
				res = -EINTR;
				break;
			}

			spin_unlock_irqrestore(&ep->lock, flags);
			jtimeout = schedule_timeout(jtimeout);
			spin_lock_irqsave(&ep->lock, flags);
		}
		__remove_wait_queue(&ep->wq, &wait);

		set_current_state(TASK_RUNNING);
	}

	...

	if (!res && eavail &&
	    !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
		goto retry;

	return res;
}

由于篇幅限制去除一些不影响大致流程的代码,剩下的居然异常简单,首先用list_empty判断ep->rdllist链表是否为空,rdllist是就绪的文件链表,如果不为空可以直接返回给上层,如果不为空,则等待,等待的方式也比较清晰,把自己加入到ep->wq等待队列中,然后schedule_timeout(jtimeout);放弃运行权即可,完成后使用ep_send_events把结果返回给上层。

原本以为epoll会同select一样,把核心代码放到wait函数中,想不到此函数草草了事,除了查看链表是否为空和休息,基本什么也没有做,看来epoll之所以比select效率高的原因尽在于此。但现在的问题是,谁去填充队列rdllist,谁去唤醒该进程?看来还得从之前忽略了的ep_insert函数入手。

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
		     struct file *tfile, int fd)
{
	int error, revents, pwake = 0;
	unsigned long flags;
	struct epitem *epi;
	struct ep_pqueue epq;

	if (unlikely(atomic_read(&ep->user->epoll_watches) >=
		     max_user_watches))
		return -ENOSPC;
	if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
		return -ENOMEM;

	/* Item initialization follow here ... */
	INIT_LIST_HEAD(&epi->rdllink);
	INIT_LIST_HEAD(&epi->fllink);
	INIT_LIST_HEAD(&epi->pwqlist);
	epi->ep = ep;
	ep_set_ffd(&epi->ffd, tfile, fd);
	epi->event = *event;
	epi->nwait = 0;
	epi->next = EP_UNACTIVE_PTR;

	/* Initialize the poll table using the queue callback */
	epq.epi = epi;
	init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

	/*
	 * Attach the item to the poll hooks and get current event bits.
	 * We can safely use the file* here because its usage count has
	 * been increased by the caller of this function. Note that after
	 * this operation completes, the poll callback can start hitting
	 * the new item.
	 */
	revents = tfile->f_op->poll(tfile, &epq.pt);

	/*
	 * We have to check if something went wrong during the poll wait queue
	 * install process. Namely an allocation for a wait queue failed due
	 * high memory pressure.
	 */
	error = -ENOMEM;
	if (epi->nwait < 0) 		goto error_unregister; 	/* Add the current item to the list of active epoll hook for this file */ 	spin_lock(&tfile->f_lock);
	list_add_tail(&epi->fllink, &tfile->f_ep_links);
	spin_unlock(&tfile->f_lock);

	/*
	 * Add the current item to the RB tree. All RB tree operations are
	 * protected by "mtx", and ep_insert() is called with "mtx" held.
	 */
	ep_rbtree_insert(ep, epi);

	/* We have to drop the new item inside our item list to keep track of it */
	spin_lock_irqsave(&ep->lock, flags);

	/* If the file is already "ready" we drop it inside the ready list */
	if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
		list_add_tail(&epi->rdllink, &ep->rdllist);

		/* Notify waiting tasks that events are available */
		if (waitqueue_active(&ep->wq))
			wake_up_locked(&ep->wq);
		if (waitqueue_active(&ep->poll_wait))
			pwake++;
	}

	spin_unlock_irqrestore(&ep->lock, flags);

	atomic_inc(&ep->user->epoll_watches);

	/* We have to call this outside the lock */
	if (pwake)
		ep_poll_safewake(&ep->poll_wait);

	return 0;

error_unregister:
	...	/* 清理代码 */
	return error;
}

这里首先使用 kmem_cache_alloc 分配了一个 epitem *epi 结构,epi->ep = ep;并把ep指针赋值给它的成员,紧接着,又使用epq.epi = epi;把epi赋值给ep_pqueue epq;结构成员。这里嵌套了几层结构,ep->epi->epq,关系比较复杂,我们只关心核心的东西,分析流程。

后面init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);函数定义于include/linux/poll.h文件,代码如下

static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
	pt->qproc = qproc;
	pt->key   = ~0UL; /* all events enabled */
}

仅仅是给epq的pt成员设置回调函数,千万注意这里的回调应用,epoll的实现就是依赖于内核和驱动的层层回调,后面将会说到。

再来看ep_insert的代码,之后紧接着调用了目标文件驱动的poll回调,把刚才设置好的epq.pt传入驱动之中:tfile->f_op->poll(tfile,&epq.pt)。如果再这里停止跟进,往下进行,发现该函数之后的代码无非是错误判断或者各种结构之间的纠结。唯一一点有用的代码就是这句:wake_up_locked(&ep->wq)。其中ep->wq正是epoll_wait函数等待的队列,不过此情况仅仅是在驱动poll函数直接返回数据的情况下。真正触发点应该还在驱动对epq.pt的操作。

之前的文章写过驱动中对poll的实现,无非调用 poll_wait 函数,实现如下:

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
	if (p && wait_address)
	p->qproc(filp, wait_address, p);
}

对照init_poll_funcptr的代码,我们可以很清楚的发现,其实最终驱动是回调了init_poll_funcptr中设置的qproc函数!返回到ep_insert中,可以看到设置的qproc函数名为 ep_ptable_queue_proc,所以还是不得不再分析此函数。

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
				 poll_table *pt)
{
	struct epitem *epi = ep_item_from_epqueue(pt);
	struct eppoll_entry *pwq;

	if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
		init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
		pwq->whead = whead;
		pwq->base = epi;
		add_wait_queue(whead, &pwq->wait);
		list_add_tail(&pwq->llink, &epi->pwqlist);
		epi->nwait++;
	} else {
		/* We have to signal that an error occurred */
		epi->nwait = -1;
	}
}

乍一看函数代码并不多,不过又创建出一个结构eppoll_entry,并调用init_waitqueue_func_entry初始化等待的回调函数。再使用pwq->base = epi保存了epi地址,接下来执行正常的add_wait_queue操作,加入等待队列。期待的结果就是设备数据就绪时,驱动wake_up等待队列,可以回调加入的函数ep_poll_callback,要想进一步研究,可以查看相关代码:

/* 函数init_waitqueue_func_entry仅仅初始化func回调函数 */
static inline void init_waitqueue_func_entry(wait_queue_t *q,
				wait_queue_func_t func)
{
	q->flags = 0;
	q->private = NULL;
	q->func = func;
}

#define wake_up(x)                      __wake_up(x, TASK_NORMAL, 1, NULL)

void __wake_up(wait_queue_head_t *q, unsigned int mode,
			int nr_exclusive, void *key)
{
	unsigned long flags;

	spin_lock_irqsave(&q->lock, flags);
	__wake_up_common(q, mode, nr_exclusive, 0, key);
	spin_unlock_irqrestore(&q->lock, flags);
}

static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
			int nr_exclusive, int wake_flags, void *key)
{
	wait_queue_t *curr, *next;

	list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
		unsigned flags = curr->flags;

		/* 最终到达这里调用刚才设置的回调函数 */
		if (curr->func(curr, mode, wake_flags, key) &&
				(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
			break;
	}
}

现在可以知道的是,epoll_wait处于等待并且设备准备完成,会调用ep_poll_callback函数,一切的矛头都指向ep_poll_callback,大致可以猜测,增加就绪队列rdllist元素、唤醒epoll_wait函数都应在此函数中实现。

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
	int pwake = 0;
	unsigned long flags;
	struct epitem *epi = ep_item_from_wait(wait);
	struct eventpoll *ep = epi->ep;

	spin_lock_irqsave(&ep->lock, flags);

	...

	/* If this file is already in the ready list we exit soon */
	if (!ep_is_linked(&epi->rdllink))
		list_add_tail(&epi->rdllink, &ep->rdllist);

	/*
	 * Wake up ( if active ) both the eventpoll wait list and the ->poll()
	 * wait list.
	 */
	if (waitqueue_active(&ep->wq))
		wake_up_locked(&ep->wq);
	if (waitqueue_active(&ep->poll_wait))
		pwake++;

out_unlock:
	spin_unlock_irqrestore(&ep->lock, flags);

	/* We have to call this outside the lock */
	if (pwake)
		ep_poll_safewake(&ep->poll_wait);

	return 1;
}

粗略看下代码感觉此函数不负众望,首先struct epitem *epi = ep_item_from_wait(wait)从wait中获取epi结构,并取得ep,list_add_tail(&epi->rdllink, &ep->rdllist);把加入到ep的就绪队列rdllist中,而后wake_up_locked(&ep->wq)唤醒epoll_wait

这里还有些细节,如果记得清楚,刚才是把wait变量赋值到结构epi的成员,这里确从wait中取出结构体的地址,这是Linux内核常用手段,依赖于宏container_of,此宏定义于include/linux/kernel.h,代码如下:

#define container_of(ptr, type, member) ({                      \
        const typeof( ((type *)0)->member ) *__mptr = (ptr);    \
        (type *)( (char *)__mptr - offsetof(type,member) );})

对c语言来说,结构体并没有太多的秘密,无非是内存的组合,typeof( ((type *)0)->member ) 在编译期构造一个member成员类型,指针指向ptr,之后减去member在类型type中的偏移即为type结构地址。

分析到这里,我们可以看到,epoll模型并没有对每一个文件描述符遍历,仅仅当描述符对应设备就绪时,回调epoll,加入就绪队列唤醒epoll_wait,当文件描述符增多时,效率不会同select一样线性下降,由于select会轮询所有设备,可能大多数设备是没有数据的,在文件描述符较少情况下倒是显不出什么,当有成千上万描述符并且大量描述符处于空闲状态时,对效率的损耗是相当惊人的。所以epoll为我们提供了一种更加有效便捷的方式去管理如此多的描述符。

Built with Hugo
主题 StackJimmy 设计