select/poll模型一直是Linux长期以来使用较多的服务器模型,自从2.6内核引入epoll模型以后,我们又有了一种更加有效的服务器模型,一直以来都耳闻epoll模型对比select模型的优点,赞誉之声一片,他如何高效,高效到什么程度,还是一知半解,在之前的一篇博客,我简要分析了select模型代码,的确正如前人所言,使用轮询方式处理连接,这里我讲分析epoll模型的框架,体会其高效。
这里先简略说明一下epoll的使用流程,其使用或许比select模型更为简洁:
epoll_create(size);
while (...)
{
/* 等待就绪连接 */
epoll_wait( ... );
/* 如有新连接,构造epoll_event结构体后 */
epoll_ctl( ... EPOLL_CTL_ADD ... );
/* 如有断开连接 */
epoll_ctl( ... EPOLL_CTL_DEL ... );
}
从使用角度出发,先来看epoll_create
函数的实现,用户态声明int epoll_create(int size);
对应的系统调用代码:
:::
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size
可以看到,sys_epoll_create
仅仅检测size是否小于0,之后直接调用sys_epoll_create1
,也就是说size在2.6.34版本中是没有使用的,通过查阅代码,发现sys_epoll_create1
是在2.6.27版本中加入的,但是之前的几个版本也仅仅只是输出日志时使用,没有回溯查看真正废弃改值是在哪个版本,总之真正的实现是位于sys_epoll_create1
。
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error;
struct eventpoll *ep = NULL;
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (error < 0)
ep_free(ep);
return error;
}
代码结构比较清晰,调用ep_alloc
分配一个eventpoll结构,调用anon_inode_getfd
创建一个文件节点和文件描述符,并返回文件描述符,这个文件描述符供epoll自己使用。
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
...
/* 从file结构私有数据取出ep_alloc分配的结构 */
ep = file->private_data;
mutex_lock(&ep->mtx);
/*
* Try to lookup the file inside our RB tree, Since we grabbed "mtx"
* above, we can be sure to be able to use the item looked up by
* ep_find() till we release the mutex.
*/
epi = ep_find(ep, tfile, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tfile, fd);
} else
error = -EEXIST;
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
mutex_unlock(&ep->mtx);
error_tgt_fput:
fput(tfile);
error_fput:
fput(file);
error_return:
return error;
}
去除错误检测,剩下的代码也比较清晰,首先取出epoll_create1
分配的eventpoll结构ep,然后使用ep_find
在ep中查找当前操作的文件描述符,接下来有个判断,分不同操作进行,如果是EPOLL_CTL_ADD
,则调ep_insert
插入文件描述符,如果是EPOLL_CTL_DEL
则调用ep_remove
删除文件描述符,修改则用ep_modify
。
姑且先忽略ep操作的函数,直接去看epoll_wait的实现,以明确流程。
epoll_wait
实现与1321行,不过其中所作操作非常简单:从file结构中取出eventpoll结构,直接调用ep_poll
,我们需要着重分析ep_poll
。
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;
...
res = 0;
if (list_empty(&ep->rdllist)) {
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
init_waitqueue_entry(&wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
__add_wait_queue(&ep->wq, &wait);
for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE);
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
...
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
}
由于篇幅限制去除一些不影响大致流程的代码,剩下的居然异常简单,首先用list_empty
判断ep->rdllist
链表是否为空,rdllist是就绪的文件链表,如果不为空可以直接返回给上层,如果不为空,则等待,等待的方式也比较清晰,把自己加入到ep->wq等待队列中,然后schedule_timeout(jtimeout);
放弃运行权即可,完成后使用ep_send_events
把结果返回给上层。
原本以为epoll会同select一样,把核心代码放到wait函数中,想不到此函数草草了事,除了查看链表是否为空和休息,基本什么也没有做,看来epoll之所以比select效率高的原因尽在于此。但现在的问题是,谁去填充队列rdllist,谁去唤醒该进程?看来还得从之前忽略了的ep_insert函数入手。
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
struct epitem *epi;
struct ep_pqueue epq;
if (unlikely(atomic_read(&ep->user->epoll_watches) >=
max_user_watches))
return -ENOSPC;
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
/* Item initialization follow here ... */
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
/* Initialize the poll table using the queue callback */
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
/*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function. Note that after
* this operation completes, the poll callback can start hitting
* the new item.
*/
revents = tfile->f_op->poll(tfile, &epq.pt);
/*
* We have to check if something went wrong during the poll wait queue
* install process. Namely an allocation for a wait queue failed due
* high memory pressure.
*/
error = -ENOMEM;
if (epi->nwait < 0) goto error_unregister; /* Add the current item to the list of active epoll hook for this file */ spin_lock(&tfile->f_lock);
list_add_tail(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);
/*
* Add the current item to the RB tree. All RB tree operations are
* protected by "mtx", and ep_insert() is called with "mtx" held.
*/
ep_rbtree_insert(ep, epi);
/* We have to drop the new item inside our item list to keep track of it */
spin_lock_irqsave(&ep->lock, flags);
/* If the file is already "ready" we drop it inside the ready list */
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
atomic_inc(&ep->user->epoll_watches);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 0;
error_unregister:
... /* 清理代码 */
return error;
}
这里首先使用 kmem_cache_alloc
分配了一个 epitem *epi
结构,epi->ep = ep;
并把ep指针赋值给它的成员,紧接着,又使用epq.epi = epi;
把epi赋值给ep_pqueue epq;
结构成员。这里嵌套了几层结构,ep->epi->epq,关系比较复杂,我们只关心核心的东西,分析流程。
后面init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);函数定义于include/linux/poll.h文件,代码如下
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
pt->qproc = qproc;
pt->key = ~0UL; /* all events enabled */
}
仅仅是给epq的pt成员设置回调函数,千万注意这里的回调应用,epoll的实现就是依赖于内核和驱动的层层回调,后面将会说到。
再来看ep_insert
的代码,之后紧接着调用了目标文件驱动的poll回调,把刚才设置好的epq.pt传入驱动之中:tfile->f_op->poll(tfile,&epq.pt)
。如果再这里停止跟进,往下进行,发现该函数之后的代码无非是错误判断或者各种结构之间的纠结。唯一一点有用的代码就是这句:wake_up_locked(&ep->wq)
。其中ep->wq
正是epoll_wait
函数等待的队列,不过此情况仅仅是在驱动poll函数直接返回数据的情况下。真正触发点应该还在驱动对epq.pt的操作。
之前的文章写过驱动中对poll的实现,无非调用 poll_wait
函数,实现如下:
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && wait_address)
p->qproc(filp, wait_address, p);
}
对照init_poll_funcptr
的代码,我们可以很清楚的发现,其实最终驱动是回调了init_poll_funcptr
中设置的qproc函数!返回到ep_insert
中,可以看到设置的qproc函数名为 ep_ptable_queue_proc
,所以还是不得不再分析此函数。
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}
乍一看函数代码并不多,不过又创建出一个结构eppoll_entry
,并调用init_waitqueue_func_entry
初始化等待的回调函数。再使用pwq->base = epi
保存了epi地址,接下来执行正常的add_wait_queue
操作,加入等待队列。期待的结果就是设备数据就绪时,驱动wake_up
等待队列,可以回调加入的函数ep_poll_callback
,要想进一步研究,可以查看相关代码:
/* 函数init_waitqueue_func_entry仅仅初始化func回调函数 */
static inline void init_waitqueue_func_entry(wait_queue_t *q,
wait_queue_func_t func)
{
q->flags = 0;
q->private = NULL;
q->func = func;
}
#define wake_up(x) __wake_up(x, TASK_NORMAL, 1, NULL)
void __wake_up(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, void *key)
{
unsigned long flags;
spin_lock_irqsave(&q->lock, flags);
__wake_up_common(q, mode, nr_exclusive, 0, key);
spin_unlock_irqrestore(&q->lock, flags);
}
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;
list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;
/* 最终到达这里调用刚才设置的回调函数 */
if (curr->func(curr, mode, wake_flags, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
现在可以知道的是,epoll_wait
处于等待并且设备准备完成,会调用ep_poll_callback
函数,一切的矛头都指向ep_poll_callback
,大致可以猜测,增加就绪队列rdllist元素、唤醒epoll_wait
函数都应在此函数中实现。
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
spin_lock_irqsave(&ep->lock, flags);
...
/* If this file is already in the ready list we exit soon */
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 1;
}
粗略看下代码感觉此函数不负众望,首先struct epitem *epi = ep_item_from_wait(wait)
从wait中获取epi结构,并取得ep,list_add_tail(&epi->rdllink, &ep->rdllist);
把加入到ep的就绪队列rdllist中,而后wake_up_locked(&ep->wq)
唤醒epoll_wait
。
这里还有些细节,如果记得清楚,刚才是把wait变量赋值到结构epi的成员,这里确从wait中取出结构体的地址,这是Linux内核常用手段,依赖于宏container_of,此宏定义于include/linux/kernel.h,代码如下:
#define container_of(ptr, type, member) ({ \
const typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - offsetof(type,member) );})
对c语言来说,结构体并没有太多的秘密,无非是内存的组合,typeof( ((type *)0)->member ) 在编译期构造一个member成员类型,指针指向ptr,之后减去member在类型type中的偏移即为type结构地址。
分析到这里,我们可以看到,epoll模型并没有对每一个文件描述符遍历,仅仅当描述符对应设备就绪时,回调epoll,加入就绪队列唤醒epoll_wait,当文件描述符增多时,效率不会同select一样线性下降,由于select会轮询所有设备,可能大多数设备是没有数据的,在文件描述符较少情况下倒是显不出什么,当有成千上万描述符并且大量描述符处于空闲状态时,对效率的损耗是相当惊人的。所以epoll为我们提供了一种更加有效便捷的方式去管理如此多的描述符。