Linux中的epoll_wait系统调用
epoll_wait是epoll的核心,用来获取注册到该epoll上的就绪事件。在Linux中的recv系统调用一文中分析了传统的阻塞IO是怎么进行阻塞和唤醒的,而epoll_wait也是可以限时阻塞的,那通过本文的分析可以对比一下两者的执行过程有什么区别。
epoll_wait
ep_timeout_to_timespec
c
fs/eventpoll.c
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
struct timespec64 to;
return do_epoll_wait(epfd, events, maxevents,
ep_timeout_to_timespec(&to, timeout));
}
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, struct timespec64 *to)
{
int error;
struct fd f;
struct eventpoll *ep;
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
if (!access_ok(events, maxevents * sizeof(struct epoll_event)))
return -EFAULT;
/* Get the "struct file *" for the eventpoll file */
// 获取到epoll对象的fd
f = fdget(epfd);
if (!f.file)
return -EBADF;
/*
* We have to check that the file structure underneath the fd
* the user passed to us _is_ an eventpoll file.
*/
error = -EINVAL;
if (!is_file_epoll(f.file))
goto error_fput;
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = f.file->private_data;
/* Time to fish for events ... */
// 获取就绪事件
error = ep_poll(ep, events, maxevents, to);
error_fput:
fdput(f);
return error;
}
c
fs/eventpoll.c
static struct timespec64 *ep_timeout_to_timespec(struct timespec64 *to, long ms)
{
struct timespec64 now;
if (ms < 0)
return NULL;
// 如果用户层传下来的是0
if (!ms) {
to->tv_sec = 0;
to->tv_nsec = 0;
return to;
}
to->tv_sec = ms / MSEC_PER_SEC;
to->tv_nsec = NSEC_PER_MSEC * (ms % MSEC_PER_SEC);
ktime_get_ts64(&now);
*to = timespec64_add_safe(now, *to);
return to;
}
参数timeout就是当没有就绪事件时阻塞等待的最长时间,这里对时间进行了转化操作,然后就调用真正的实现do_epoll_wait函数。
ep_poll
ep_events_available
c
fs/eventpoll.c
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, struct timespec64 *timeout)
{
int res, eavail, timed_out = 0;
u64 slack = 0;
// epoll等待项
wait_queue_entry_t wait;
ktime_t expires, *to = NULL;
lockdep_assert_irqs_enabled();
if (timeout && (timeout->tv_sec | timeout->tv_nsec)) {
slack = select_estimate_accuracy(timeout);
to = &expires;
// 记录睡眠后应该被唤醒的时间点
*to = timespec64_to_ktime(*timeout);
} else if (timeout) { // 不阻塞等待的情况
/*
* Avoid the unnecessary trip to the wait queue loop, if the
* caller specified a non blocking operation.
*/
timed_out = 1;
}
/*
* This call is racy: We may or may not see events that are being added
* to the ready list under the lock (e.g., in IRQ callbacks). For cases
* with a non-zero timeout, this thread will check the ready list under
* lock and will add to the wait queue. For cases with a zero
* timeout, the user by definition should not care and will have to
* recheck again.
*/
// 判断就绪队列上有没有事件就绪
eavail = ep_events_available(ep);
// 无限循环,因为线程可能会被中断
while (1) {
if (eavail) { // 获取到就绪事件的情况
/*
* Try to transfer events to user space. In case we get
* 0 events and there's still timeout left over, we go
* trying again in search of more luck.
*/
// 获取就绪事件数量
res = ep_send_events(ep, events, maxevents);
if (res)
// 返回就绪事件数量
return res;
}
// 如果不阻塞等待,则直接返回
if (timed_out)
return 0;
// 先忙等
eavail = ep_busy_loop(ep, timed_out);
if (eavail)
continue;
if (signal_pending(current))
return -EINTR;
/*
* Internally init_wait() uses autoremove_wake_function(),
* thus wait entry is removed from the wait queue on each
* wakeup. Why it is important? In case of several waiters
* each new wakeup will hit the next waiter, giving it the
* chance to harvest new event. Otherwise wakeup can be
* lost. This is also good performance-wise, because on
* normal wakeup path no need to call __remove_wait_queue()
* explicitly, thus ep->lock is not taken, which halts the
* event delivery.
*
* In fact, we now use an even more aggressive function that
* unconditionally removes, because we don't reuse the wait
* entry between loop iterations. This lets us also avoid the
* performance issue if a process is killed, causing all of its
* threads to wake up without being removed normally.
*/
// 创建等待项
init_wait(&wait);
// 设置回调函数
wait.func = ep_autoremove_wake_function;
write_lock_irq(&ep->lock);
/*
* Barrierless variant, waitqueue_active() is called under
* the same lock on wakeup ep_poll_callback() side, so it
* is safe to avoid an explicit barrier.
*/
// 设置当前线程的状态为可中断睡眠
__set_current_state(TASK_INTERRUPTIBLE);
/*
* Do the final check under the lock. ep_scan_ready_list()
* plays with two lists (->rdllist and ->ovflist) and there
* is always a race when both lists are empty for short
* period of time although events are pending, so lock is
* important.
*/
// 阻塞之前,再次判断有没有就绪事件
eavail = ep_events_available(ep);
if (!eavail)
// 将新的等待项添加到epoll->wq中
__add_wait_queue_exclusive(&ep->wq, &wait);
write_unlock_irq(&ep->lock);
if (!eavail)
/*
* 让出CPU,主动进入睡眠状态,第一个参数to包含了睡眠的时长信息,
* 返回值表示是否还要睡眠,取反后就是是否不睡眠了,当前循环中没有修改参数to,可能是该函数内部做了修改。
*/
timed_out = !schedule_hrtimeout_range(to, slack,
HRTIMER_MODE_ABS);
// 重新将进程状态设置为运行态
__set_current_state(TASK_RUNNING);
/*
* We were woken up, thus go and try to harvest some events.
* If timed out and still on the wait queue, recheck eavail
* carefully under lock, below.
*/
eavail = 1;
if (!list_empty_careful(&wait.entry)) {
write_lock_irq(&ep->lock);
/*
* If the thread timed out and is not on the wait queue,
* it means that the thread was woken up after its
* timeout expired before it could reacquire the lock.
* Thus, when wait.entry is empty, it needs to harvest
* events.
*/
if (timed_out)
eavail = list_empty(&wait.entry);
__remove_wait_queue(&ep->wq, &wait);
write_unlock_irq(&ep->lock);
}
}
}
c
fs/eventpoll.c
static inline int ep_events_available(struct eventpoll *ep)
{
// 判断链表中是否有节点
return !list_empty_careful(&ep->rdllist) ||
READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
}
该函数中较长,但是主要可以总结为如下:
- 判断是否有就绪事件(判断链表中是否有节点);
- 如果有就绪事件,则返回就绪事件数量;
- 否则:
- 如果无需阻塞(上层传入的timeout是0),则直接返回0;
- 否则,睡眠用户传入的时长,睡眠期间线程是可中断的,所以上面函数中要使用无限循环。
可以看到,调用epoll_wait时,用户传入的睡眠时间可以0,这样就算没有就绪事件,也不会阻塞上层的用户线程执行。这是和传统的recv同步阻塞调用最大的区别。