epoll是Linux引以為榮的技術,因為相對於select和poll有很大的性能改進。本文主要介紹epoll的實現原理,了解epoll高效背後的魔法。
epoll的使用簡介
1. epoll_create
使用epoll時需要使用epoll_create()創建一個epoll的文件句柄,epoll_create()函數的原型如下:
\tintepoll_create(int size);
此接口用於創建一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大。
2. epoll_ctl
使用epoll_ctl()可以向epoll句柄添加或者刪除要監聽的文件句柄。epoll_ctl()函數原型如下:
\tintepoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
要監聽文件是否可讀寫時先要向epoll句柄註冊要監聽的事件類型。第一個參數是epoll_create()返回的epoll句柄,第二個參數表示動作,用三個宏來表示:
- EPOLL_CTL_ADD:註冊新的fd到epfd中。
- EPOLL_CTL_MOD:修改已經註冊的fd的監聽事件。
- EPOLL_CTL_DEL:從epfd中刪除一個fd。
第三個參數是需要監聽的文件句柄,第四個參數是告訴內核需要監聽什麼事。
3. epoll_wait
萬事俱備,現在只需要調用epoll_wait()函數就可以開始等待事件發生了。epoll_wailt()函數的原型如下:
\tintepoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
參數events用來從內核得到事件的集合,maxevents告之內核這個events有多大,這個 maxevents的值不能大於創建epoll_create()時的size,參數timeout是超時時間(毫秒,0會立即返回,-1是永久阻塞)。該函數返回需要處理的事件數目,如返回0表示已超時。
epoll實現原理
前面介紹了epoll的使用,接下來主要介紹epoll在內核的實現原理。
當我們在用戶態調用epoll_create()時,會觸發調用內核的sys_epoll_create()。我們先來看看sys_epoll_create()這個函數:
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
structeventpoll *ep = NULL;
structfile *file;
...
error = ep_alloc(&ep);
...
fd = get_unused_fd_flags(O_RDWR| (flags & O_CLOEXEC));
...
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags &O_CLOEXEC));
...
fd_install(fd,file);
\tep->file = file;
\t
return fd;
}
這個函數主要調用ep_alloc()申請一個eventpoll結構,eventpoll結構是epoll的核心數據結構,我們來看看這個結構的定義:
structeventpoll {
spinlock_t lock;
structmutexmtx;
wait_queue_head_t wq;
wait_queue_head_t poll_wait;
structlist_headrdllist;
structrb_rootrbr;
structepitem *ovflist;
structuser_struct *user;
structfile *file;
intvisited;
structlist_headvisited_list_link;
};
eventpoll結構有三個欄位是比較重要的,分別是:wq、rdllist和rbr。
- wq用於保存有哪些進程在等待這個epoll返回。
- rdllist用於保存可讀寫的文件。
- rbr用於建立一個快速查找文件句柄的紅黑樹。
創建完eventpoll結構後,sys_epoll_create()會調用get_unused_fd_flags()獲取一個空閒的文件句柄fd,接著調anon_inode_getfile()獲取一個空閒的file結構,並且把eventpoll結構與file結構綁定。最後調用fd_install()把文件句柄fd與file結構綁定,返迴文件句柄fd。通過一系列的操作後,內核就可以通過文件句柄fd與eventpoll結構進行關聯。
根據epoll的使用流程,使用epoll_create()創建epoll句柄後,可以通過epoll_ctl()函數向epoll句柄添加和刪除要監視的文件句柄。調用epoll_ctl()會觸發內核調用sys_epoll_ctl()函數,我們來看看sys_epoll_ctl()函數的最重要部分:
SYSCALL_DEFINE4(epoll_ctl,int, epfd, int, op, int, fd,
struct epoll_event __user *,event)
{
...
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds,tfile, fd);
} else
error = -EEXIST;
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi,&epds);
} else
error = -ENOENT;
break;
}
...
return error;
}
sys_epoll_ctl()會根據我們傳遞的op參數來進行不同的操作,我們主要看看op為EPOLL_CTL_ADD的操作,也就是添加操作。當進行添加操作時,sys_epoll_ctl()最終會調用ep_insert()把文件句柄fd添加到eventpoll結構維護的紅黑樹中,ep_insert()代碼如下:
static int ep_insert(struct eventpoll *ep, struct epoll_event*event,
struct file *tfile, int fd)
{
int error, revents, pwake =0;
unsignedlong flags;
structepitem *epi;
structep_pqueue epq;
...
if (!(epi =kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
...
init_poll_funcptr(&epq.pt,ep_ptable_queue_proc);
revents = tfile->f_op->poll(tfile,&epq.pt);
...
ep_rbtree_insert(ep, epi);
...
return 0;
}
ep_insert()函數首先創建一個epitem結構用於管理事件,然後調用文件句柄的poll()接口,根據init_poll_funcptr(&epq.pt,ep_ptable_queue_proc)這行代碼的設置,poll()接口最終會調用ep_ptable_queue_proc()函數。ep_ptable_queue_proc()函數代碼如下:
static void ep_ptable_queue_proc(struct file *file,wait_queue_head_t *whead, poll_table*pt)
{
structepitem *epi = ep_item_from_epqueue(pt);
structeppoll_entry *pwq;
if (epi->nwait >= 0
\t&& (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL)))
\t{
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
epi->nwait = -1;
}
}
ep_ptable_queue_proc()函數的作用就是把epitem結構添加到文件的等待隊列中,根據上面的代碼可知,當等待隊列被喚醒的時候,將會調用ep_poll_callback()函數。而ep_poll_callback()函數的作用就是把epitem結構放置到eventpoll結構的rdllist隊列中。我們前面分析過,rdllist就是就緒的文件隊列。ep_poll_callback()函數最終會調用wake_up_locked(&ep->wq)喚醒進程。簡化後的代碼如下:
static int ep_poll_callback(wait_queue_t *wait,
\tunsigned mode, int sync, void *key)
{
...
// 把epitem添加到rdllist隊列中
if(!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink,&ep->rdllist);
// 喚醒進程
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
...
return 1;
}
ep_insert()函數最後一個操作就是調用ep_rbtree_insert()把epitem結構添加的eventpoll結構的紅黑樹中。如下圖:
使用紅黑樹管理epitem結構的目的是可以根據文件句柄fd快速查找到對應的epitem結構。紅黑樹是一棵平衡二叉樹,時間複雜度為O(logN)。
添加文件句柄到epoll之後,就可以調用epoll_wait()函數開始監聽文件。epoll_wait()會調用內核的sys_epoll_wait()函數,而sys_epoll_wait()最終會調用ep_poll()函數,代碼如下:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user*events,
int maxevents, long timeout)
{
...
if (list_empty(&ep->rdllist)) {
init_waitqueue_entry(&wait,current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
__add_wait_queue(&ep->wq,&wait);
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (!list_empty(&ep->rdllist)|| !jtimeout)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock,flags);
}
__remove_wait_queue(&ep->wq,&wait);
set_current_state(TASK_RUNNING);
}
...
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents))&& jtimeout)
goto retry;
return res;
}
ep_poll()函數所做的事情很簡單,就是把當前進程設置為可中斷睡眠狀態,然後添加eventpoll結構的等待隊列中,最後調用schedule_timeout()讓出CPU。這樣當前進程就會進入睡眠狀態,當進程醒來的時候會判斷eventpoll結構的rdllist隊列是否為空,然後不為空就調用ep_send_events()函數把可讀寫的文件拷貝到用戶態的events數組中。
那麼什麼時候當前進程會被喚醒呢?在分析ep_insert()函數的時候,我們提及過當文件狀態發生改變時會調用ep_poll_callback()函數,而ep_poll_callback()函數會把就緒的文件添加到rdllist隊列,並且就會把當前進程喚醒。
總結
本文主要分析了epoll的實現原理,可以知道,epoll並不會對所有文件進行掃描(而select和poll會對所以文件進行掃描),而是使用事件的方式把就緒的文件收集起來,所以epoll的效率非常高。
當然本文還有一些epoll的細節並沒有介紹到,例如水平觸發和邊緣觸發等,有興趣可以自己研究代碼。