Linux中的epoll_ctl系统调用

在用户空间创建好epoll对象后,就会向其注册socket,并设置好感兴趣的事件。这个步骤是epoll的核心,是后续调用的epoll_wait打下基础。在该系统调用中会为socket设置回调函数,这样事件就绪时会自动修改epoll中的事件列表,而不用用户进程的介入,这也是为什么epoll很高效的原因。


epoll_ctl

还是老规矩,先从系统调用入口开始分析。

v5.19.17
c
fs/eventpoll.c
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)
{
	struct epoll_event epds;

	if (ep_op_has_event(op) &&
	    copy_from_user(&epds, event, sizeof(struct epoll_event)))
		return -EFAULT;

	// 注册事件
	return do_epoll_ctl(epfd, op, fd, &epds, false);
}

真正的实现在do_epoll_ctl函数中。

v5.19.17
c
fs/eventpoll.c
int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
		 bool nonblock)
{
	int error;
	int full_check = 0;
	struct fd f, tf;
	struct eventpoll *ep;
	struct epitem *epi;
	struct eventpoll *tep = NULL;

	error = -EBADF;
	// 根据epfd找到eventpoll内核对象
	f = fdget(epfd);
	if (!f.file)
		goto error_return;

	/* Get the "struct file *" for the target file */
	// 根据socket句柄号,找到file内核对象
	tf = fdget(fd);
	if (!tf.file)
		goto error_fput;


	// 执行不同类型的操作
	switch (op) {
	case EPOLL_CTL_ADD:
		if (!epi) {
			epds->events |= EPOLLERR | EPOLLHUP;
			// 执行注册
			error = ep_insert(ep, epds, tf.file, fd, full_check);
		} else
			error = -EEXIST;
		break;
	case EPOLL_CTL_DEL:
		if (epi)
			error = ep_remove(ep, epi);
		else
			error = -ENOENT;
		break;
	case EPOLL_CTL_MOD:
		if (epi) {
			if (!(epi->event.events & EPOLLEXCLUSIVE)) {
				epds->events |= EPOLLERR | EPOLLHUP;
				error = ep_modify(ep, epi, epds);
			}
		} else
			error = -ENOENT;
		break;
	}
	mutex_unlock(&ep->mtx);

error_tgt_fput:
	if (full_check) {
		clear_tfile_check_list();
		loop_check_gen++;
		mutex_unlock(&epmutex);
	}

	fdput(tf);
error_fput:
	fdput(f);
error_return:

	return error;
}

在该函数中,会针对操作类型做不同的操作,这里以添加事件为例。

EPOLL_CTL_ADD

v5.19.17
c
fs/eventpoll.c
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
		     struct file *tfile, int fd, int full_check)
{
	int error, pwake = 0;
	__poll_t revents;
	struct epitem *epi;
	struct ep_pqueue epq;
	struct eventpoll *tep = NULL;

	if (is_file_epoll(tfile))
		tep = tfile->private_data;

	lockdep_assert_irqs_enabled();

	if (unlikely(percpu_counter_compare(&ep->user->epoll_watches,
					    max_user_watches) >= 0))
		return -ENOSPC;
	percpu_counter_inc(&ep->user->epoll_watches);

	// 创建epitem对象
	if (!(epi = kmem_cache_zalloc(epi_cache, GFP_KERNEL))) {
		percpu_counter_dec(&ep->user->epoll_watches);
		return -ENOMEM;
	}

	/* Item initialization follow here ... */
	// 对epitem对象进行初始化
	INIT_LIST_HEAD(&epi->rdllink);
	epi->ep = ep;
	ep_set_ffd(&epi->ffd, tfile, fd);
	epi->event = *event;
	epi->next = EP_UNACTIVE_PTR;

	if (tep)
		mutex_lock_nested(&tep->mtx, 1);
	/* Add the current item to the list of active epoll hook for this file */
	if (unlikely(attach_epitem(tfile, epi) < 0)) {
		if (tep)
			mutex_unlock(&tep->mtx);
		kmem_cache_free(epi_cache, epi);
		percpu_counter_dec(&ep->user->epoll_watches);
		return -ENOMEM;
	}

	if (full_check && !tep)
		list_file(tfile);

	/*
	 * Add the current item to the RB tree. All RB tree operations are
	 * protected by "mtx", and ep_insert() is called with "mtx" held.
	 */
	// 将epi插入eventpoll的红黑树中
	ep_rbtree_insert(ep, epi);
	if (tep)
		mutex_unlock(&tep->mtx);

	/* now check if we've created too many backpaths */
	if (unlikely(full_check && reverse_path_check())) {
		ep_remove(ep, epi);
		return -EINVAL;
	}

	if (epi->event.events & EPOLLWAKEUP) {
		error = ep_create_wakeup_source(epi);
		if (error) {
			ep_remove(ep, epi);
			return error;
		}
	}

	/* Initialize the poll table using the queue callback */
	epq.epi = epi;
	// 为poll_table设置qproc函数指针
	init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

	/*
	 * Attach the item to the poll hooks and get current event bits.
	 * We can safely use the file* here because its usage count has
	 * been increased by the caller of this function. Note that after
	 * this operation completes, the poll callback can start hitting
	 * the new item.
	 */
	// 注册回调函数
	revents = ep_item_poll(epi, &epq.pt, 1);

	/*
	 * We have to check if something went wrong during the poll wait queue
	 * install process. Namely an allocation for a wait queue failed due
	 * high memory pressure.
	 */
	if (unlikely(!epq.epi)) {
		ep_remove(ep, epi);
		return -ENOMEM;
	}

	/* We have to drop the new item inside our item list to keep track of it */
	write_lock_irq(&ep->lock);

	/* record NAPI ID of new item if present */
	ep_set_busy_poll_napi_id(epi);

	/* If the file is already "ready" we drop it inside the ready list */
	if (revents && !ep_is_linked(epi)) {
		list_add_tail(&epi->rdllink, &ep->rdllist);
		ep_pm_stay_awake(epi);

		/* Notify waiting tasks that events are available */
		if (waitqueue_active(&ep->wq))
			wake_up(&ep->wq);
		if (waitqueue_active(&ep->poll_wait))
			pwake++;
	}

	write_unlock_irq(&ep->lock);

	/* We have to call this outside the lock */
	if (pwake)
		ep_poll_safewake(ep, NULL);

	return 0;
}

在该函数中,创建了epoll的红黑树节点epi对象,并将其添加到红黑树中,最后设置socket的等待队列。本文关注的是事件机制,所以主要分析等待队列的设置。

设置socket等待队列

v5.19.17
ep_item_poll
__ep_eventpoll_poll
<
>
c
fs/eventpoll.c
static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,
				 int depth)
{
	struct file *file = epi->ffd.file;
	__poll_t res;

	pt->_key = epi->event.events;
	if (!is_file_epoll(file))
		res = vfs_poll(file, pt);
	else
		// 注册回调
		res = __ep_eventpoll_poll(file, pt, depth);
	return res & epi->event.events;
}
c
fs/eventpoll.c
static __poll_t __ep_eventpoll_poll(struct file *file, poll_table *wait, int depth)
{
	struct eventpoll *ep = file->private_data;
	LIST_HEAD(txlist);
	struct epitem *epi, *tmp;
	poll_table pt;
	__poll_t res = 0;

	init_poll_funcptr(&pt, NULL);

	/* Insert inside our poll wait queue */
	// 分析这里
	poll_wait(file, &ep->poll_wait, wait);

	return res;
}

接下来调用了poll_wait函数。

v5.19.17
c
include/linux/poll.h
// 该函数负责触发回调函数
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
	if (p && p->_qproc && wait_address)
		/*
		 * 对于epoll:
		 * 这里的p->_qproc函数指针是在eventpoll.c中的ep_insert函数中通过init_poll_funcptr函数设置的。
		 * 在那里设置的函数是ep_ptable_queue_proc。
		 */
		p->_qproc(filp, wait_address, p);
}

poll_table_qproc字段是什么?这要往回去找,在ep_insert函数中,通过调用init_poll_funcptr函数来设置了该函数指针,值是fs/eventpoll.c文件中的ep_ptable_queue_proc函数。

v5.19.17
c
include/linux/poll.h
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
	pt->_qproc = qproc;
	pt->_key   = ~(__poll_t)0; /* all events enabled */
}

下面就来看看ep_ptable_queue_proc函数的逻辑。

v5.19.17
ep_ptable_queue_proc
init_waitqueue_func_entry
<
>
c
fs/eventpoll.c
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
				 poll_table *pt)
{
	struct ep_pqueue *epq = container_of(pt, struct ep_pqueue, pt);
	struct epitem *epi = epq->epi;
	struct eppoll_entry *pwq;

	if (unlikely(!epi))	// an earlier allocation has failed
		return;

	// 新建epoll等待项,其wait成员是socket的等待项
	pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL);
	if (unlikely(!pwq)) {
		epq->epi = NULL;
		return;
	}

	// 为socket的等待项设置回调函数
	init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
	pwq->whead = whead;
	pwq->base = epi;
	if (epi->event.events & EPOLLEXCLUSIVE)
		add_wait_queue_exclusive(whead, &pwq->wait);
	else
		// 将等待项添加到socket的等待队列中
		add_wait_queue(whead, &pwq->wait);
	pwq->next = epi->pwqlist;
	epi->pwqlist = pwq;
}
c
include/linux/wait.h
static inline void
init_waitqueue_func_entry(struct wait_queue_entry *wq_entry, wait_queue_func_t func)
{
	wq_entry->flags		= 0;
	// 这里socket是由epoll来管理的,所以不需要在socket就绪时进行唤醒的进程,所以设置为了NULL。
	wq_entry->private	= NULL;
	wq_entry->func		= func;
}

该函数先是创建了epoll等待项,其wait字段就是socket的等待项,然后为socket等待项设置了回调函数,最后将其添加到socket的等待队列中。重点注意一下这里设置的函数是ep_poll_callback,socket事件就绪时会调用该函数。 另外强调一下,这里添加了等待项后就直接返回了,而在recv系统调用同步阻塞的实现中,添加了等待项就会阻塞当前线程,而这里没有等待,设置完就直接返回到用户空间中了。

数据来了

在软中断线程接收到数据时,会执行tcp_rcv_established函数,会唤醒阻塞在sock对象上的进程。

c
void tcp_rcv_established(struct sock *sk, struct sk_buff *skb)
{
	...
	tcp_data_ready(sk);
	...
}
v5.19.17
c
net/ipv4/tcp_input.c
void tcp_data_ready(struct sock *sk)
{
	if (tcp_epollin_ready(sk, sk->sk_rcvlowat) || sock_flag(sk, SOCK_DONE))
		// 数据已准备好,唤醒阻塞在socket上的进程
		sk->sk_data_ready(sk);
}

sock对象的sk_data_ready函数是什么?这样回到创建sock对象的时候,在介绍socket系统调用时,讲到在net/ipv4/af_inet.c文件中的inet_create函数中会创建sock对象并初始化它,就会设置它的sk_data_ready函数,具体设置的是sock_def_readable函数。

v5.19.17
c
net/core/sock.c
void sock_def_readable(struct sock *sk)
{
	struct socket_wq *wq;

	rcu_read_lock();
	wq = rcu_dereference(sk->sk_wq);
	// 如果有进程在此socket的等待队列中
	if (skwq_has_sleeper(wq))
		// 唤醒等待队列中的进程
		wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI |
						EPOLLRDNORM | EPOLLRDBAND);
	sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
	rcu_read_unlock();
}

接下来是找到注册到socket等待队列上的节点,并执行其回调函数。

v5.19.17
wake_up_interruptible_sync_poll
__wake_up_sync_key
<
>
c
include/linux/wait.h
#define wake_up_interruptible_sync_poll(x, m)					\
	__wake_up_sync_key((x), TASK_INTERRUPTIBLE, poll_to_key(m))
c
kernel/sched/wait.c
void __wake_up_sync_key(struct wait_queue_head *wq_head, unsigned int mode,
			void *key)
{
	if (unlikely(!wq_head))
		return;

	/*
	 * 唤醒进程,
	 * 这里传入的nr_exclusive是1,表示即使有多个进程阻塞在同一个socket上,也只唤醒一个进程。
	 * 其作用是为了避免”惊群“。
	 */
	__wake_up_common_lock(wq_head, mode, 1, WF_SYNC, key);
}
static void __wake_up_common_lock(struct wait_queue_head *wq_head, unsigned int mode,
			int nr_exclusive, int wake_flags, void *key)
{
	unsigned long flags;
	wait_queue_entry_t bookmark;

	bookmark.flags = 0;
	bookmark.private = NULL;
	bookmark.func = NULL;
	INIT_LIST_HEAD(&bookmark.entry);

	do {
		spin_lock_irqsave(&wq_head->lock, flags);
		// 实现进程唤醒
		nr_exclusive = __wake_up_common(wq_head, mode, nr_exclusive,
						wake_flags, key, &bookmark);
		spin_unlock_irqrestore(&wq_head->lock, flags);
	} while (bookmark.flags & WQ_FLAG_BOOKMARK);
}
static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
			int nr_exclusive, int wake_flags, void *key,
			wait_queue_entry_t *bookmark)
{
	wait_queue_entry_t *curr, *next;
	int cnt = 0;

	lockdep_assert_held(&wq_head->lock);

	if (bookmark && (bookmark->flags & WQ_FLAG_BOOKMARK)) {
		curr = list_next_entry(bookmark, entry);

		list_del(&bookmark->entry);
		bookmark->flags = 0;
	} else
		curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry);

	if (&curr->entry == &wq_head->head)
		return nr_exclusive;

    // 遍历等待队列项
	list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
		unsigned flags = curr->flags;
		int ret;

		if (flags & WQ_FLAG_BOOKMARK)

			continue;

        // 调用回调函数
		ret = curr->func(curr, mode, wake_flags, key);
		if (ret < 0)
			break;
		if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
			break;

		if (bookmark && (++cnt > WAITQUEUE_WALK_BREAK_CNT) &&
				(&next->entry != &wq_head->head)) {
			bookmark->flags = WQ_FLAG_BOOKMARK;
			list_add_tail(&bookmark->entry, &next->entry);
			break;
		}
	}

	return nr_exclusive;
}

ep_poll_callback回调函数

v5.19.17
c
fs/eventpoll.c
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
	int pwake = 0;
	// 根据socket等待项找到epoll节点
	struct epitem *epi = ep_item_from_wait(wait);
	// 获取到对应的eventpoll对象
	struct eventpoll *ep = epi->ep;
	__poll_t pollflags = key_to_poll(key);
	unsigned long flags;
	int ewake = 0;

	read_lock_irqsave(&ep->lock, flags);

	ep_set_busy_poll_napi_id(epi);

	/*
	 * If the event mask does not contain any poll(2) event, we consider the
	 * descriptor to be disabled. This condition is likely the effect of the
	 * EPOLLONESHOT bit that disables the descriptor when an event is received,
	 * until the next EPOLL_CTL_MOD will be issued.
	 */
	if (!(epi->event.events & ~EP_PRIVATE_BITS))
		goto out_unlock;

	/*
	 * Check the events coming with the callback. At this stage, not
	 * every device reports the events in the "key" parameter of the
	 * callback. We need to be able to handle both cases here, hence the
	 * test for "key" != NULL before the event match test.
	 */
	if (pollflags && !(pollflags & epi->event.events))
		goto out_unlock;

	/*
	 * If we are transferring events to userspace, we can hold no locks
	 * (because we're accessing user memory, and because of linux f_op->poll()
	 * semantics). All the events that happen during that period of time are
	 * chained in ep->ovflist and requeued later on.
	 */
	if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
		if (chain_epi_lockless(epi))
			ep_pm_stay_awake_rcu(epi);
	} else if (!ep_is_linked(epi)) {
		/* In the usual case, add event to ready list. */

		// 将当前epoll节点添加到epoll的就绪队列中,后面调用epoll_wait时会判断该链表中有没有节点从而判断有没有就绪事件
		if (list_add_tail_lockless(&epi->rdllink, &ep->rdllist))
			ep_pm_stay_awake_rcu(epi);
	}

	/*
	 * Wake up ( if active ) both the eventpoll wait list and the ->poll()
	 * wait list.
	 */
	// 查看epoll等待队列上是否有等���项,也就是调用epoll_wait被阻塞的那些进程。
	if (waitqueue_active(&ep->wq)) {
		if ((epi->event.events & EPOLLEXCLUSIVE) &&
					!(pollflags & POLLFREE)) {
			switch (pollflags & EPOLLINOUT_BITS) {
			case EPOLLIN:
				if (epi->event.events & EPOLLIN)
					ewake = 1;
				break;
			case EPOLLOUT:
				if (epi->event.events & EPOLLOUT)
					ewake = 1;
				break;
			case 0:
				ewake = 1;
				break;
			}
		}
		// 对等待队列中的等待项执行唤醒操作
		wake_up(&ep->wq);
	}
	if (waitqueue_active(&ep->poll_wait))
		pwake++;

out_unlock:
	read_unlock_irqrestore(&ep->lock, flags);

	/* We have to call this outside the lock */
	if (pwake)
		ep_poll_safewake(ep, epi);

	if (!(epi->event.events & EPOLLEXCLUSIVE))
		ewake = 1;

	if (pollflags & POLLFREE) {
		/*
		 * If we race with ep_remove_wait_queue() it can miss
		 * ->whead = NULL and do another remove_wait_queue() after
		 * us, so we can't use __remove_wait_queue().
		 */
		list_del_init(&wait->entry);
		/*
		 * ->whead != NULL protects us from the race with ep_free()
		 * or ep_remove(), ep_remove_wait_queue() takes whead->lock
		 * held by the caller. Once we nullify it, nothing protects
		 * ep/epi or even wait.
		 */
		smp_store_release(&ep_pwq_from_wait(wait)->whead, NULL);
	}

	return ewake;
}

该函数的步骤如下:

  • 根据socket的等待项找epoll的节点;
  • 将epoll节点添加到rdllink链表中,后续epoll_wait操作会判断该链表中是否有节点,从而来判断是否有就绪事件。
  • 如果有因为epoll_wait操作阻塞的线程,那么唤醒它们。

唤醒这些被阻塞的线程一样会调用到上面介绍过的__wake_up_common函数,主要逻辑是调用回调函数。那epoll的等待项的回调函数设置的是哪个函数呢?这就要到epoll_wait操作的执行流程中去找,最终在fs/eventpoll.c的ep_poll函数中找到,设置的是ep_autoremove_wake_function函数。

v5.19.17
c
fs/eventpoll.c
static int ep_autoremove_wake_function(struct wait_queue_entry *wq_entry,
				       unsigned int mode, int sync, void *key)
{
	// 唤醒线程
	int ret = default_wake_function(wq_entry, mode, sync, key);

	// 从等待队列中移除
	list_del_init(&wq_entry->entry);
	return ret;
}

关键实现是default_wake_function函数。

v5.19.17
c
kernel/sched/core.c
int default_wake_function(wait_queue_entry_t *curr, unsigned mode, int wake_flags,
			  void *key)
{
	WARN_ON_ONCE(IS_ENABLED(CONFIG_SCHED_DEBUG) && wake_flags & ~WF_SYNC);
	return try_to_wake_up(curr->private, mode, wake_flags);
}

真正的实现在try_to_wake_up函数中,这个函数比较长,主要的逻辑是重新为进程选择一个CPU,然后获取对应的runq,并将进程放入进去,从而该进程就可以被调度了。

v5.19.17
c
kernel/sched/core.c
static int
try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags)
{
	unsigned long flags;
	int cpu, success = 0;


	// 选择一个合适的CPU
	cpu = select_task_rq(p, p->wake_cpu, wake_flags | WF_TTWU);
	if (task_cpu(p) != cpu) {
		if (p->in_iowait) {
			delayacct_blkio_end(p);
			atomic_dec(&task_rq(p)->nr_iowait);
		}

		wake_flags |= WF_MIGRATED;
		psi_ttwu_dequeue(p);
		// 为进程设置CPU
		set_task_cpu(p, cpu);
	}
#else
	cpu = task_cpu(p);
#endif /* CONFIG_SMP */

	// 将任务添加到CPU运行队列中
	ttwu_queue(p, cpu, wake_flags);
unlock:
	raw_spin_unlock_irqrestore(&p->pi_lock, flags);
out:
	if (success)
		ttwu_stat(p, task_cpu(p), wake_flags);
	preempt_enable();

	return success;
}

总结

通过本文可以发现epoll高性能的原因之一,在注册事件时不会阻塞用户线程,而是通过设置socket等待队列回调函数的形式。由软中断线程(职责之一就是负责接收网络数据)来调用我们设置的回调函数,将就绪的epoll节点添加到链表中,这样epoll_wait就可以快速判断是否有就绪事件了。