Linux中的epoll_wait系统调用

epoll_wait是epoll的核心,用来获取注册到该epoll上的就绪事件。在Linux中的recv系统调用一文中分析了传统的阻塞IO是怎么进行阻塞和唤醒的,而epoll_wait也是可以限时阻塞的,那通过本文的分析可以对比一下两者的执行过程有什么区别。


v5.19.17
epoll_wait
ep_timeout_to_timespec
<
>
c
fs/eventpoll.c
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
		int, maxevents, int, timeout)
{
	struct timespec64 to;

	return do_epoll_wait(epfd, events, maxevents,
			     ep_timeout_to_timespec(&to, timeout));
}
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
			 int maxevents, struct timespec64 *to)
{
	int error;
	struct fd f;
	struct eventpoll *ep;

	/* The maximum number of event must be greater than zero */
	if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
		return -EINVAL;

	/* Verify that the area passed by the user is writeable */
	if (!access_ok(events, maxevents * sizeof(struct epoll_event)))
		return -EFAULT;

	/* Get the "struct file *" for the eventpoll file */
	// 获取到epoll对象的fd
	f = fdget(epfd);
	if (!f.file)
		return -EBADF;

	/*
	 * We have to check that the file structure underneath the fd
	 * the user passed to us _is_ an eventpoll file.
	 */
	error = -EINVAL;
	if (!is_file_epoll(f.file))
		goto error_fput;

	/*
	 * At this point it is safe to assume that the "private_data" contains
	 * our own data structure.
	 */
	ep = f.file->private_data;

	/* Time to fish for events ... */
	// 获取就绪事件
	error = ep_poll(ep, events, maxevents, to);

error_fput:
	fdput(f);
	return error;
}
c
fs/eventpoll.c
static struct timespec64 *ep_timeout_to_timespec(struct timespec64 *to, long ms)
{
	struct timespec64 now;

	if (ms < 0)
		return NULL;

	// 如果用户层传下来的是0
	if (!ms) {
		to->tv_sec = 0;
		to->tv_nsec = 0;
		return to;
	}

	to->tv_sec = ms / MSEC_PER_SEC;
	to->tv_nsec = NSEC_PER_MSEC * (ms % MSEC_PER_SEC);

	ktime_get_ts64(&now);
	*to = timespec64_add_safe(now, *to);
	return to;
}

参数timeout就是当没有就绪事件时阻塞等待的最长时间,这里对时间进行了转化操作,然后就调用真正的实现do_epoll_wait函数。

v5.19.17
ep_poll
ep_events_available
<
>
c
fs/eventpoll.c
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
		   int maxevents, struct timespec64 *timeout)
{
	int res, eavail, timed_out = 0;
	u64 slack = 0;
	// epoll等待项
	wait_queue_entry_t wait;
	ktime_t expires, *to = NULL;

	lockdep_assert_irqs_enabled();

	if (timeout && (timeout->tv_sec | timeout->tv_nsec)) {
		slack = select_estimate_accuracy(timeout);
		to = &expires;
		// 记录睡眠后应该被唤醒的时间点
		*to = timespec64_to_ktime(*timeout);
	} else if (timeout) { // 不阻塞等待的情况
		/*
		 * Avoid the unnecessary trip to the wait queue loop, if the
		 * caller specified a non blocking operation.
		 */
		timed_out = 1;
	}

	/*
	 * This call is racy: We may or may not see events that are being added
	 * to the ready list under the lock (e.g., in IRQ callbacks). For cases
	 * with a non-zero timeout, this thread will check the ready list under
	 * lock and will add to the wait queue.  For cases with a zero
	 * timeout, the user by definition should not care and will have to
	 * recheck again.
	 */
	// 判断就绪队列上有没有事件就绪
	eavail = ep_events_available(ep);

	// 无限循环,因为线程可能会被中断
	while (1) {
		if (eavail) { // 获取到就绪事件的情况
			/*
			 * Try to transfer events to user space. In case we get
			 * 0 events and there's still timeout left over, we go
			 * trying again in search of more luck.
			 */
			// 获取就绪事件数量
			res = ep_send_events(ep, events, maxevents);
			if (res)
				// 返回就绪事件数量
				return res;
		}

		// 如果不阻塞等待,则直接返回
		if (timed_out)
			return 0;

		// 先忙等
		eavail = ep_busy_loop(ep, timed_out);
		if (eavail)
			continue;

		if (signal_pending(current))
			return -EINTR;

		/*
		 * Internally init_wait() uses autoremove_wake_function(),
		 * thus wait entry is removed from the wait queue on each
		 * wakeup. Why it is important? In case of several waiters
		 * each new wakeup will hit the next waiter, giving it the
		 * chance to harvest new event. Otherwise wakeup can be
		 * lost. This is also good performance-wise, because on
		 * normal wakeup path no need to call __remove_wait_queue()
		 * explicitly, thus ep->lock is not taken, which halts the
		 * event delivery.
		 *
		 * In fact, we now use an even more aggressive function that
		 * unconditionally removes, because we don't reuse the wait
		 * entry between loop iterations. This lets us also avoid the
		 * performance issue if a process is killed, causing all of its
		 * threads to wake up without being removed normally.
		 */
		// 创建等待项
		init_wait(&wait);
		// 设置回调函数
		wait.func = ep_autoremove_wake_function;

		write_lock_irq(&ep->lock);
		/*
		 * Barrierless variant, waitqueue_active() is called under
		 * the same lock on wakeup ep_poll_callback() side, so it
		 * is safe to avoid an explicit barrier.
		 */
		// 设置当前线程的状态为可中断睡眠
		__set_current_state(TASK_INTERRUPTIBLE);

		/*
		 * Do the final check under the lock. ep_scan_ready_list()
		 * plays with two lists (->rdllist and ->ovflist) and there
		 * is always a race when both lists are empty for short
		 * period of time although events are pending, so lock is
		 * important.
		 */
		 // 阻塞之前,再次判断有没有就绪事件
		eavail = ep_events_available(ep);
		if (!eavail)
			// 将新的等待项添加到epoll->wq中
			__add_wait_queue_exclusive(&ep->wq, &wait);

		write_unlock_irq(&ep->lock);

		if (!eavail)
			/*
			 * 让出CPU,主动进入睡眠状态,第一个参数to包含了睡眠的时长信息,
			 * 返回值表示是否还要睡眠,取反后就是是否不睡眠了,当前循环中没有修改参数to,可能是该函数内部做了修改。
			 */
			timed_out = !schedule_hrtimeout_range(to, slack,
							      HRTIMER_MODE_ABS);
		// 重新将进程状态设置为运行态
		__set_current_state(TASK_RUNNING);

		/*
		 * We were woken up, thus go and try to harvest some events.
		 * If timed out and still on the wait queue, recheck eavail
		 * carefully under lock, below.
		 */
		eavail = 1;

		if (!list_empty_careful(&wait.entry)) {
			write_lock_irq(&ep->lock);
			/*
			 * If the thread timed out and is not on the wait queue,
			 * it means that the thread was woken up after its
			 * timeout expired before it could reacquire the lock.
			 * Thus, when wait.entry is empty, it needs to harvest
			 * events.
			 */
			if (timed_out)
				eavail = list_empty(&wait.entry);
			__remove_wait_queue(&ep->wq, &wait);
			write_unlock_irq(&ep->lock);
		}
	}
}
c
fs/eventpoll.c
static inline int ep_events_available(struct eventpoll *ep)
{
	// 判断链表中是否有节点
	return !list_empty_careful(&ep->rdllist) ||
		READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
}

该函数中较长,但是主要可以总结为如下:

  • 判断是否有就绪事件(判断链表中是否有节点);
  • 如果有就绪事件,则返回就绪事件数量;
  • 否则:
    • 如果无需阻塞(上层传入的timeout是0),则直接返回0;
    • 否则,睡眠用户传入的时长,睡眠期间线程是可中断的,所以上面函数中要使用无限循环。

可以看到,调用epoll_wait时,用户传入的睡眠时间可以0,这样就算没有就绪事件,也不会阻塞上层的用户线程执行。这是和传统的recv同步阻塞调用最大的区别。