在用户空间创建好epoll对象后,就会向其注册socket,并设置好感兴趣的事件。这个步骤是epoll的核心,是后续调用的epoll_wait打下基础。在该系统调用中会为socket设置回调函数,这样事件就绪时会自动修改epoll中的事件列表,而不用用户进程的介入,这也是为什么epoll很高效的原因。
epoll_ctl
还是老规矩,先从系统调用入口开始分析。
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
struct epoll_event epds;
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
return -EFAULT;
// 注册事件
return do_epoll_ctl(epfd, op, fd, &epds, false);
}
真正的实现在do_epoll_ctl函数中。
int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
bool nonblock)
{
int error;
int full_check = 0;
struct fd f, tf;
struct eventpoll *ep;
struct epitem *epi;
struct eventpoll *tep = NULL;
error = -EBADF;
// 根据epfd找到eventpoll内核对象
f = fdget(epfd);
if (!f.file)
goto error_return;
/* Get the "struct file *" for the target file */
// 根据socket句柄号,找到file内核对象
tf = fdget(fd);
if (!tf.file)
goto error_fput;
// 执行不同类型的操作
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds->events |= EPOLLERR | EPOLLHUP;
// 执行注册
error = ep_insert(ep, epds, tf.file, fd, full_check);
} else
error = -EEXIST;
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
if (!(epi->event.events & EPOLLEXCLUSIVE)) {
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_modify(ep, epi, epds);
}
} else
error = -ENOENT;
break;
}
mutex_unlock(&ep->mtx);
error_tgt_fput:
if (full_check) {
clear_tfile_check_list();
loop_check_gen++;
mutex_unlock(&epmutex);
}
fdput(tf);
error_fput:
fdput(f);
error_return:
return error;
}
在该函数中,会针对操作类型做不同的操作,这里以添加事件为例。
EPOLL_CTL_ADD
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
struct file *tfile, int fd, int full_check)
{
int error, pwake = 0;
__poll_t revents;
struct epitem *epi;
struct ep_pqueue epq;
struct eventpoll *tep = NULL;
if (is_file_epoll(tfile))
tep = tfile->private_data;
lockdep_assert_irqs_enabled();
if (unlikely(percpu_counter_compare(&ep->user->epoll_watches,
max_user_watches) >= 0))
return -ENOSPC;
percpu_counter_inc(&ep->user->epoll_watches);
// 创建epitem对象
if (!(epi = kmem_cache_zalloc(epi_cache, GFP_KERNEL))) {
percpu_counter_dec(&ep->user->epoll_watches);
return -ENOMEM;
}
/* Item initialization follow here ... */
// 对epitem对象进行初始化
INIT_LIST_HEAD(&epi->rdllink);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->next = EP_UNACTIVE_PTR;
if (tep)
mutex_lock_nested(&tep->mtx, 1);
/* Add the current item to the list of active epoll hook for this file */
if (unlikely(attach_epitem(tfile, epi) < 0)) {
if (tep)
mutex_unlock(&tep->mtx);
kmem_cache_free(epi_cache, epi);
percpu_counter_dec(&ep->user->epoll_watches);
return -ENOMEM;
}
if (full_check && !tep)
list_file(tfile);
/*
* Add the current item to the RB tree. All RB tree operations are
* protected by "mtx", and ep_insert() is called with "mtx" held.
*/
// 将epi插入eventpoll的红黑树中
ep_rbtree_insert(ep, epi);
if (tep)
mutex_unlock(&tep->mtx);
/* now check if we've created too many backpaths */
if (unlikely(full_check && reverse_path_check())) {
ep_remove(ep, epi);
return -EINVAL;
}
if (epi->event.events & EPOLLWAKEUP) {
error = ep_create_wakeup_source(epi);
if (error) {
ep_remove(ep, epi);
return error;
}
}
/* Initialize the poll table using the queue callback */
epq.epi = epi;
// 为poll_table设置qproc函数指针
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
/*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function. Note that after
* this operation completes, the poll callback can start hitting
* the new item.
*/
// 注册回调函数
revents = ep_item_poll(epi, &epq.pt, 1);
/*
* We have to check if something went wrong during the poll wait queue
* install process. Namely an allocation for a wait queue failed due
* high memory pressure.
*/
if (unlikely(!epq.epi)) {
ep_remove(ep, epi);
return -ENOMEM;
}
/* We have to drop the new item inside our item list to keep track of it */
write_lock_irq(&ep->lock);
/* record NAPI ID of new item if present */
ep_set_busy_poll_napi_id(epi);
/* If the file is already "ready" we drop it inside the ready list */
if (revents && !ep_is_linked(epi)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
write_unlock_irq(&ep->lock);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(ep, NULL);
return 0;
}
在该函数中,创建了epoll的红黑树节点epi对象,并将其添加到红黑树中,最后设置socket的等待队列。本文关注的是事件机制,所以主要分析等待队列的设置。
设置socket等待队列
static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,
int depth)
{
struct file *file = epi->ffd.file;
__poll_t res;
pt->_key = epi->event.events;
if (!is_file_epoll(file))
res = vfs_poll(file, pt);
else
// 注册回调
res = __ep_eventpoll_poll(file, pt, depth);
return res & epi->event.events;
}
static __poll_t __ep_eventpoll_poll(struct file *file, poll_table *wait, int depth)
{
struct eventpoll *ep = file->private_data;
LIST_HEAD(txlist);
struct epitem *epi, *tmp;
poll_table pt;
__poll_t res = 0;
init_poll_funcptr(&pt, NULL);
/* Insert inside our poll wait queue */
// 分析这里
poll_wait(file, &ep->poll_wait, wait);
return res;
}
接下来调用了poll_wait函数。
// 该函数负责触发回调函数
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address)
/*
* 对于epoll:
* 这里的p->_qproc函数指针是在eventpoll.c中的ep_insert函数中通过init_poll_funcptr函数设置的。
* 在那里设置的函数是ep_ptable_queue_proc。
*/
p->_qproc(filp, wait_address, p);
}
poll_table的_qproc字段是什么?这要往回去找,在ep_insert函数中,通过调用init_poll_funcptr函数来设置了该函数指针,值是fs/eventpoll.c文件中的ep_ptable_queue_proc函数。
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
pt->_qproc = qproc;
pt->_key = ~(__poll_t)0; /* all events enabled */
}
下面就来看看ep_ptable_queue_proc函数的逻辑。
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct ep_pqueue *epq = container_of(pt, struct ep_pqueue, pt);
struct epitem *epi = epq->epi;
struct eppoll_entry *pwq;
if (unlikely(!epi)) // an earlier allocation has failed
return;
// 新建epoll等待项,其wait成员是socket的等待项
pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL);
if (unlikely(!pwq)) {
epq->epi = NULL;
return;
}
// 为socket的等待项设置回调函数
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
if (epi->event.events & EPOLLEXCLUSIVE)
add_wait_queue_exclusive(whead, &pwq->wait);
else
// 将等待项添加到socket的等待队列中
add_wait_queue(whead, &pwq->wait);
pwq->next = epi->pwqlist;
epi->pwqlist = pwq;
}
static inline void
init_waitqueue_func_entry(struct wait_queue_entry *wq_entry, wait_queue_func_t func)
{
wq_entry->flags = 0;
// 这里socket是由epoll来管理的,所以不需要在socket就绪时进行唤醒的进程,所以设置为了NULL。
wq_entry->private = NULL;
wq_entry->func = func;
}
该函数先是创建了epoll等待项,其wait字段就是socket的等待项,然后为socket等待项设置了回调函数,最后将其添加到socket的等待队列中。重点注意一下这里设置的函数是ep_poll_callback,socket事件就绪时会调用该函数。 另外强调一下,这里添加了等待项后就直接返回了,而在recv系统调用同步阻塞的实现中,添加了等待项就会阻塞当前线程,而这里没有等待,设置完就直接返回到用户空间中了。
数据来了
在软中断线程接收到数据时,会执行tcp_rcv_established函数,会唤醒阻塞在sock对象上的进程。
void tcp_rcv_established(struct sock *sk, struct sk_buff *skb)
{
...
tcp_data_ready(sk);
...
}
void tcp_data_ready(struct sock *sk)
{
if (tcp_epollin_ready(sk, sk->sk_rcvlowat) || sock_flag(sk, SOCK_DONE))
// 数据已准备好,唤醒阻塞在socket上的进程
sk->sk_data_ready(sk);
}
sock对象的sk_data_ready函数是什么?这样回到创建sock对象的时候,在介绍socket系统调用时,讲到在net/ipv4/af_inet.c文件中的inet_create函数中会创建sock对象并初始化它,就会设置它的sk_data_ready函数,具体设置的是sock_def_readable函数。
void sock_def_readable(struct sock *sk)
{
struct socket_wq *wq;
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
// 如果有进程在此socket的等待队列中
if (skwq_has_sleeper(wq))
// 唤醒等待队列中的进程
wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI |
EPOLLRDNORM | EPOLLRDBAND);
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
rcu_read_unlock();
}
接下来是找到注册到socket等待队列上的节点,并执行其回调函数。
#define wake_up_interruptible_sync_poll(x, m) \
__wake_up_sync_key((x), TASK_INTERRUPTIBLE, poll_to_key(m))
void __wake_up_sync_key(struct wait_queue_head *wq_head, unsigned int mode,
void *key)
{
if (unlikely(!wq_head))
return;
/*
* 唤醒进程,
* 这里传入的nr_exclusive是1,表示即使有多个进程阻塞在同一个socket上,也只唤醒一个进程。
* 其作用是为了避免”惊群“。
*/
__wake_up_common_lock(wq_head, mode, 1, WF_SYNC, key);
}
static void __wake_up_common_lock(struct wait_queue_head *wq_head, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
unsigned long flags;
wait_queue_entry_t bookmark;
bookmark.flags = 0;
bookmark.private = NULL;
bookmark.func = NULL;
INIT_LIST_HEAD(&bookmark.entry);
do {
spin_lock_irqsave(&wq_head->lock, flags);
// 实现进程唤醒
nr_exclusive = __wake_up_common(wq_head, mode, nr_exclusive,
wake_flags, key, &bookmark);
spin_unlock_irqrestore(&wq_head->lock, flags);
} while (bookmark.flags & WQ_FLAG_BOOKMARK);
}
static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
int nr_exclusive, int wake_flags, void *key,
wait_queue_entry_t *bookmark)
{
wait_queue_entry_t *curr, *next;
int cnt = 0;
lockdep_assert_held(&wq_head->lock);
if (bookmark && (bookmark->flags & WQ_FLAG_BOOKMARK)) {
curr = list_next_entry(bookmark, entry);
list_del(&bookmark->entry);
bookmark->flags = 0;
} else
curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry);
if (&curr->entry == &wq_head->head)
return nr_exclusive;
// 遍历等待队列项
list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
unsigned flags = curr->flags;
int ret;
if (flags & WQ_FLAG_BOOKMARK)
continue;
// 调用回调函数
ret = curr->func(curr, mode, wake_flags, key);
if (ret < 0)
break;
if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
if (bookmark && (++cnt > WAITQUEUE_WALK_BREAK_CNT) &&
(&next->entry != &wq_head->head)) {
bookmark->flags = WQ_FLAG_BOOKMARK;
list_add_tail(&bookmark->entry, &next->entry);
break;
}
}
return nr_exclusive;
}
ep_poll_callback回调函数
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
// 根据socket等待项找到epoll节点
struct epitem *epi = ep_item_from_wait(wait);
// 获取到对应的eventpoll对象
struct eventpoll *ep = epi->ep;
__poll_t pollflags = key_to_poll(key);
unsigned long flags;
int ewake = 0;
read_lock_irqsave(&ep->lock, flags);
ep_set_busy_poll_napi_id(epi);
/*
* If the event mask does not contain any poll(2) event, we consider the
* descriptor to be disabled. This condition is likely the effect of the
* EPOLLONESHOT bit that disables the descriptor when an event is received,
* until the next EPOLL_CTL_MOD will be issued.
*/
if (!(epi->event.events & ~EP_PRIVATE_BITS))
goto out_unlock;
/*
* Check the events coming with the callback. At this stage, not
* every device reports the events in the "key" parameter of the
* callback. We need to be able to handle both cases here, hence the
* test for "key" != NULL before the event match test.
*/
if (pollflags && !(pollflags & epi->event.events))
goto out_unlock;
/*
* If we are transferring events to userspace, we can hold no locks
* (because we're accessing user memory, and because of linux f_op->poll()
* semantics). All the events that happen during that period of time are
* chained in ep->ovflist and requeued later on.
*/
if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
if (chain_epi_lockless(epi))
ep_pm_stay_awake_rcu(epi);
} else if (!ep_is_linked(epi)) {
/* In the usual case, add event to ready list. */
// 将当前epoll节点添加到epoll的就绪队列中,后面调用epoll_wait时会判断该链表中有没有节点从而判断有没有就绪事件
if (list_add_tail_lockless(&epi->rdllink, &ep->rdllist))
ep_pm_stay_awake_rcu(epi);
}
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
// 查看epoll等待队列上是否有等���项,也就是调用epoll_wait被阻塞的那些进程。
if (waitqueue_active(&ep->wq)) {
if ((epi->event.events & EPOLLEXCLUSIVE) &&
!(pollflags & POLLFREE)) {
switch (pollflags & EPOLLINOUT_BITS) {
case EPOLLIN:
if (epi->event.events & EPOLLIN)
ewake = 1;
break;
case EPOLLOUT:
if (epi->event.events & EPOLLOUT)
ewake = 1;
break;
case 0:
ewake = 1;
break;
}
}
// 对等待队列中的等待项执行唤醒操作
wake_up(&ep->wq);
}
if (waitqueue_active(&ep->poll_wait))
pwake++;
out_unlock:
read_unlock_irqrestore(&ep->lock, flags);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(ep, epi);
if (!(epi->event.events & EPOLLEXCLUSIVE))
ewake = 1;
if (pollflags & POLLFREE) {
/*
* If we race with ep_remove_wait_queue() it can miss
* ->whead = NULL and do another remove_wait_queue() after
* us, so we can't use __remove_wait_queue().
*/
list_del_init(&wait->entry);
/*
* ->whead != NULL protects us from the race with ep_free()
* or ep_remove(), ep_remove_wait_queue() takes whead->lock
* held by the caller. Once we nullify it, nothing protects
* ep/epi or even wait.
*/
smp_store_release(&ep_pwq_from_wait(wait)->whead, NULL);
}
return ewake;
}
该函数的步骤如下:
- 根据socket的等待项找epoll的节点;
- 将epoll节点添加到rdllink链表中,后续epoll_wait操作会判断该链表中是否有节点,从而来判断是否有就绪事件。
- 如果有因为epoll_wait操作阻塞的线程,那么唤醒它们。
唤醒这些被阻塞的线程一样会调用到上面介绍过的__wake_up_common函数,主要逻辑是调用回调函数。那epoll的等待项的回调函数设置的是哪个函数呢?这就要到epoll_wait操作的执行流程中去找,最终在fs/eventpoll.c的ep_poll函数中找到,设置的是ep_autoremove_wake_function函数。
static int ep_autoremove_wake_function(struct wait_queue_entry *wq_entry,
unsigned int mode, int sync, void *key)
{
// 唤醒线程
int ret = default_wake_function(wq_entry, mode, sync, key);
// 从等待队列中移除
list_del_init(&wq_entry->entry);
return ret;
}
关键实现是default_wake_function函数。
int default_wake_function(wait_queue_entry_t *curr, unsigned mode, int wake_flags,
void *key)
{
WARN_ON_ONCE(IS_ENABLED(CONFIG_SCHED_DEBUG) && wake_flags & ~WF_SYNC);
return try_to_wake_up(curr->private, mode, wake_flags);
}
真正的实现在try_to_wake_up函数中,这个函数比较长,主要的逻辑是重新为进程选择一个CPU,然后获取对应的runq,并将进程放入进去,从而该进程就可以被调度了。
static int
try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags)
{
unsigned long flags;
int cpu, success = 0;
// 选择一个合适的CPU
cpu = select_task_rq(p, p->wake_cpu, wake_flags | WF_TTWU);
if (task_cpu(p) != cpu) {
if (p->in_iowait) {
delayacct_blkio_end(p);
atomic_dec(&task_rq(p)->nr_iowait);
}
wake_flags |= WF_MIGRATED;
psi_ttwu_dequeue(p);
// 为进程设置CPU
set_task_cpu(p, cpu);
}
#else
cpu = task_cpu(p);
#endif /* CONFIG_SMP */
// 将任务添加到CPU运行队列中
ttwu_queue(p, cpu, wake_flags);
unlock:
raw_spin_unlock_irqrestore(&p->pi_lock, flags);
out:
if (success)
ttwu_stat(p, task_cpu(p), wake_flags);
preempt_enable();
return success;
}
总结
通过本文可以发现epoll高性能的原因之一,在注册事件时不会阻塞用户线程,而是通过设置socket等待队列回调函数的形式。由软中断线程(职责之一就是负责接收网络数据)来调用我们设置的回调函数,将就绪的epoll节点添加到链表中,这样epoll_wait就可以快速判断是否有就绪事件了。