在Linux中,不管是在服务端,还是在客户端,都会通过调用recv或recvfrom系统调用来获取对方发送来的数据,如果有数据,那么就读取并交给用户线程来处理,但如果没有数据线程则会阻塞。本文就来分析一下recv系统调用的执行过程。
系统调用层
SYSCALL_DEFINE4(recv, int, fd, void __user *, ubuf, size_t, size,
unsigned int, flags)
{
return __sys_recvfrom(fd, ubuf, size, flags, NULL, NULL);
}
SYSCALL_DEFINE6(recvfrom, int, fd, void __user *, ubuf, size_t, size,
unsigned int, flags, struct sockaddr __user *, addr,
int __user *, addr_len)
{
return __sys_recvfrom(fd, ubuf, size, flags, addr, addr_len);
}
可以看到两个系统调用调用的是同一个函数,只是参数不同而已,一般用recv比较多。
int __sys_recvfrom(int fd, void __user *ubuf, size_t size, unsigned int flags,
struct sockaddr __user *addr, int __user *addr_len)
{
struct sockaddr_storage address;
struct msghdr msg = {
/* Save some cycles and don't copy the address if not needed */
.msg_name = addr ? (struct sockaddr *)&address : NULL,
};
struct socket *sock;
struct iovec iov;
int err, err2;
int fput_needed;
err = import_single_range(READ, ubuf, size, &iov, &msg.msg_iter);
if (unlikely(err))
return err;
// 根据传入的fd找到socket对象
sock = sockfd_lookup_light(fd, &err, &fput_needed);
if (!sock)
goto out;
if (sock->file->f_flags & O_NONBLOCK)
flags |= MSG_DONTWAIT;
// 接收消息
err = sock_recvmsg(sock, &msg, flags);
if (err >= 0 && addr != NULL) {
err2 = move_addr_to_user(&address,
msg.msg_namelen, addr, addr_len);
if (err2 < 0)
err = err2;
}
fput_light(sock->file, fput_needed);
out:
return err;
}
int sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags)
{
int err = security_socket_recvmsg(sock, msg, msg_data_left(msg), flags);
// 接收消息
return err ?: sock_recvmsg_nosec(sock, msg, flags);
}
static inline int sock_recvmsg_nosec(struct socket *sock, struct msghdr *msg,
int flags)
{
// 调用socket对象中ops的recvmsg函数
return INDIRECT_CALL_INET(sock->ops->recvmsg, inet6_recvmsg,
inet_recvmsg, sock, msg, msg_data_left(msg),
flags);
}
网络连接会被抽象为一个文件,所以该函数中先通过fd查找socket对象,然后再调用sock_recvmsg函数来接收消息。本文仅分析IPV4,所以最后会调用inet_recvmsg函数。
int inet_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
int flags)
{
struct sock *sk = sock->sk;
int addr_len = 0;
int err;
if (likely(!(flags & MSG_ERRQUEUE)))
sock_rps_record_flow(sk);
// 根据协议是tcp还是udp来调用对应的recvmsg函数
err = INDIRECT_CALL_2(sk->sk_prot->recvmsg, tcp_recvmsg, udp_recvmsg,
sk, msg, size, flags, &addr_len);
if (err >= 0)
msg->msg_namelen = addr_len;
return err;
}
在该函数中,针对是TCP还是UDP调用了不同的方法,本文以最常用的TCP为例来分析。
TCP层
/*
* 从sk的接收队列中拷贝数据到用户进程的内存空间中,
* msghdr保存了用户空间的缓冲区信息,需要拷贝数据到这些缓冲区中
*/
int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int flags,
int *addr_len)
{
int cmsg_flags = 0, ret;
struct scm_timestamping_internal tss;
if (unlikely(flags & MSG_ERRQUEUE))
return inet_recv_error(sk, msg, len, addr_len);
if (sk_can_busy_loop(sk) &&
skb_queue_empty_lockless(&sk->sk_receive_queue) &&
sk->sk_state == TCP_ESTABLISHED)
sk_busy_loop(sk, flags & MSG_DONTWAIT);
/*
* 锁住sk,避免其他进程也来读该sk,以及软中断线程往sk的队列中写数据。
*/
lock_sock(sk);
// 接收数据-为应用进程拷贝数据
ret = tcp_recvmsg_locked(sk, msg, len, flags, &tss, &cmsg_flags);
// 释放锁
release_sock(sk);
if ((cmsg_flags || msg->msg_get_inq) && ret >= 0) {
if (cmsg_flags & TCP_CMSG_TS)
tcp_recv_timestamp(msg, sk, &tss);
if (msg->msg_get_inq) {
msg->msg_inq = tcp_inq_hint(sk);
if (cmsg_flags & TCP_CMSG_INQ)
put_cmsg(msg, SOL_TCP, TCP_CM_INQ,
sizeof(msg->msg_inq), &msg->msg_inq);
}
}
return ret;
}
该函数主要是做并发控制,防止多个线程读同一个socket,真正的读取操作是在tcp_recvmsg_locked函数中。
static int tcp_recvmsg_locked(struct sock *sk, struct msghdr *msg, size_t len,
int flags, struct scm_timestamping_internal *tss,
int *cmsg_flags)
{
struct tcp_sock *tp = tcp_sk(sk);
// 记录实际拷贝的数据的大小
int copied = 0;
u32 peek_seq;
u32 *seq;
unsigned long used;
int err;
int target; /* Read at least this many bytes */
long timeo;
struct sk_buff *skb, *last;
u32 urg_hole = 0;
err = -ENOTCONN;
// 还在监听状态,说明接收不了数据
if (sk->sk_state == TCP_LISTEN)
goto out;
if (tp->recvmsg_inq) {
*cmsg_flags = TCP_CMSG_INQ;
msg->msg_get_inq = 1;
}
// 获取读数据的超时时间,如果标记中设置了MSG_DONTWAIT,说明是非阻塞模式
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
// 获取tcp_sock结构体中下一个拷贝字节序字��的地址
seq = &tp->copied_seq;
// MSG_PEEK是指只读数据,不修改copied_seq字段
if (flags & MSG_PEEK) {
peek_seq = tp->copied_seq;
seq = &peek_seq;
}
// 获取需要拷贝的字节数
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
/*
* 在循环中拷贝数据,直到len等于0,当然循环过程中有break的情况。
*/
do {
u32 offset;
/* Next get a buffer. */
// 从receive队列末尾取数据
last = skb_peek_tail(&sk->sk_receive_queue);
// 遍历接收队列中的数据
skb_queue_walk(&sk->sk_receive_queue, skb) {
// 这里紧接着就对last进行赋值,那么上面两行的赋值有什么意义?
last = skb;
/* Now that we have two receive queues this
* shouldn't happen.
*/
// 出错的情况
if (WARN(before(*seq, TCP_SKB_CB(skb)->seq),
"TCP recvmsg seq # bug: copied %X, seq %X, rcvnxt %X, fl %X\n",
*seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt,
flags))
break;
// 获取当前skb中已经拷贝的字节数,通过上次拷贝的序号减去当前skb的起始序列号
offset = *seq - TCP_SKB_CB(skb)->seq;
// 如果TCP标��中包含SYN
if (unlikely(TCP_SKB_CB(skb)->tcp_flags & TCPHDR_SYN)) {
pr_err_once("%s: found a SYN, please report !\n", __func__);
offset--;
}
/*
* 如果当前skb还没被拷贝完
* 注意这里不需要小于等于
*/
if (offset < skb->len)
// 找到第一个需要拷贝的skb
goto found_ok_skb;
// 如果skb中包含FIN标记
if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
goto found_fin_ok;
WARN(!(flags & MSG_PEEK),
"TCP recvmsg seq # bug 2: copied %X, seq %X, rcvnxt %X, fl %X\n",
*seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt, flags);
}
/* Well, if we have backlog, try to process it now yet. */
// 如果已经拷贝到足够(target)的数据,而且backlog队列中已经没有节点了,则跳出循环
if (copied >= target && !READ_ONCE(sk->sk_backlog.tail))
break;
// 如果已经拷贝的长度大于等于了目标长度,那么
if (copied >= target) {
/* Do not sleep, just process backlog. */
__sk_flush_backlog(sk);
} else { // 没有收到足够的数据,
tcp_cleanup_rbuf(sk, copied);
// 阻塞当前进程,阻塞时间由timeo决定
sk_wait_data(sk, &timeo, last);
}
if ((flags & MSG_PEEK) &&
(peek_seq - copied - urg_hole != tp->copied_seq)) {
net_dbg_ratelimited("TCP(%s:%d): Application bug, race in MSG_PEEK\n",
current->comm,
task_pid_nr(current));
peek_seq = tp->copied_seq;
}
continue;
found_ok_skb:
/* Ok so how much can we use? */
// 计算还需要拷贝的字节数
used = skb->len - offset;
if (len < used)
used = len;
// MSG_TRUNC表示超出缓冲区
if (!(flags & MSG_TRUNC)) {
// 从skb中拷贝数据到msg中,offset是在skb中的偏移,而used则是需要拷贝的字节数
err = skb_copy_datagram_msg(skb, offset, msg, used);
if (err) {
/* Exception. Bailout! */
if (!copied)
copied = -EFAULT;
break;
}
}
/*
* 计算并修改下一次拷贝期望的序列号,注意这里的修改是会反应到tcp_seq结构体中的copied_seq字段的,
* 因为上面的seq获取的是字段的地址
*/
WRITE_ONCE(*seq, *seq + used);
// 累积已拷贝的数据长度
copied += used;
// 计算最新还需要拷贝的长度
len -= used;
tcp_rcv_space_adjust(sk);
} while (len > 0);
/* According to UNIX98, msg_name/msg_namelen are ignored
* on connected socket. I was just happy when found this 8) --ANK
*/
/* Clean up data we have read: This will do ACK frames. */
tcp_cleanup_rbuf(sk, copied);
return copied;
out:
return err;
recv_urg:
err = tcp_recv_urg(sk, msg, len, flags);
goto out;
recv_sndq:
err = tcp_peek_sndq(sk, msg, len);
goto out;
}
该函数非常长,主要的逻辑在do...while循环中,会调用函数skb_copy_datagram_msg将数据从skb拷贝到用户传递下来的msg对象中。循环条件中的len就是上次用户传下来的所要读取的数据长度,直到读取到足够的数据才会返回。如果没有接收到足够的数据会做什么呢?如果没有数据,则当前线程会调用sk_wait_data函数并主动进入等待状态。
阻塞实现过程
int sk_wait_data(struct sock *sk, long *timeo, const struct sk_buff *skb)
{
/*
* 创建一个名为wait的等待队列项,并注册回调函数woken_wake_function,
* 并把当前进程描述符current关联到其.private成员上。
*/
DEFINE_WAIT_FUNC(wait, woken_wake_function);
int rc;
/*
* 先调用sk_sleep函数获取sock对象下的等待队列列表头wait_queue_head,
* 然后调用add_wait_queue将新创建的wait这个等待队列项插入到sock对象的等待队列中。
*
* 后续等该socket上的数据就绪时,内核可以查找socket等待队列上的等待项,
* 进而可以找到回调函数和在等待该socket就绪事件的进程。
*/
add_wait_queue(sk_sleep(sk), &wait);
sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
// 让出CPU,进程将进入睡眠状态。
rc = sk_wait_event(sk, timeo, skb_peek_tail(&sk->sk_receive_queue) != skb, &wait);
sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
remove_wait_queue(sk_sleep(sk), &wait);
return rc;
}
#define DEFINE_WAIT_FUNC(name, function) \
struct wait_queue_entry name = { \
.private = current, \
.func = function, \
.entry = LIST_HEAD_INIT((name).entry), \
}
该函数主要就是3个核心步骤:
- 创建等待项:封装了当前线程和回调函数;
- 将等待项添加到sock对象的等待队列中;
- 最后主动让出CPU;
进入阻塞
#define sk_wait_event(__sk, __timeo, __condition, __wait) \
({ int __rc; \
/* 让出CPU前,释放锁 */ \
release_sock(__sk); \
__rc = __condition; \
if (!__rc) { \
*(__timeo) = wait_woken(__wait, \
TASK_INTERRUPTIBLE, \
*(__timeo)); \
} \
sched_annotate_sleep(); \
/* 再次获取CPU,重新上锁 */ \
lock_sock(__sk); \
__rc = __condition; \
__rc; \
}) \
在阻塞前后会分别释放和获取sock的锁,防止同时多个线程操作同一个sock对象。关键在于调用的wait_woken函数。
long wait_woken(struct wait_queue_entry *wq_entry, unsigned mode, long timeout)
{
/*
* The below executes an smp_mb(), which matches with the full barrier
* executed by the try_to_wake_up() in woken_wake_function() such that
* either we see the store to wq_entry->flags in woken_wake_function()
* or woken_wake_function() sees our store to current->state.
*/
set_current_state(mode); /* A */
if (!(wq_entry->flags & WQ_FLAG_WOKEN) && !is_kthread_should_stop())
// 等待最多timeout这么久
// 具体实现在kernel/time/timer.c文件中
timeout = schedule_timeout(timeout);
// 重新将进程状态设置为RUNNING
__set_current_state(TASK_RUNNING);
/*
* The below executes an smp_mb(), which matches with the smp_mb() (C)
* in woken_wake_function() such that either we see the wait condition
* being true or the store to wq_entry->flags in woken_wake_function()
* follows ours in the coherence order.
*/
smp_store_mb(wq_entry->flags, wq_entry->flags & ~WQ_FLAG_WOKEN); /* B */
return timeout;
}
该函数中主要调用的是schedule_timeout函数来实现阻塞的。最后会调用到kernel/sched/core.c文件中的__schedule函数,在该函数中,会将当前线程从当前CPU的runq中移除,并从中获取下一个任务,然后进行实现上下文切换。由于是限时阻塞,所以会使用一个timer来控制阻塞的时间,时间一到timer就会重新放入CPU的runq中。这里的时间是sock对象的最大接收时间,在上面的tcp_recvmsg_locked函数中会获取,并一路传下来。
唤醒进程
有阻塞就有唤醒,在软中断线程接收到数据时,会执行tcp_rcv_established函数,会唤醒阻塞在sock对象上的进程。
void tcp_rcv_established(struct sock *sk, struct sk_buff *skb)
{
...
tcp_data_ready(sk);
...
}
void tcp_data_ready(struct sock *sk)
{
if (tcp_epollin_ready(sk, sk->sk_rcvlowat) || sock_flag(sk, SOCK_DONE))
// 数据已准备好,唤醒阻塞在socket上的进程
sk->sk_data_ready(sk);
}
sock对象的sk_data_ready函数是什么?这样回到创建sock对象的时候,在介绍socket系统调用时,讲到在net/ipv4/af_inet.c文件中的inet_create函数中会创建sock对象并初始化它,就会设置它的sk_data_ready函数,具体设置的是sock_def_readable函数。
void sock_def_readable(struct sock *sk)
{
struct socket_wq *wq;
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
// 如果有进程在此socket的等待队列中
if (skwq_has_sleeper(wq))
// 唤醒等待队列中的进程
wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI |
EPOLLRDNORM | EPOLLRDBAND);
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
rcu_read_unlock();
}
在该函数中,会唤醒阻塞在该sock对象上的进程。
#define wake_up_interruptible_sync_poll(x, m) \
__wake_up_sync_key((x), TASK_INTERRUPTIBLE, poll_to_key(m))
void __wake_up_sync_key(struct wait_queue_head *wq_head, unsigned int mode,
void *key)
{
if (unlikely(!wq_head))
return;
/*
* 唤醒进程,
* 这里传入的nr_exclusive是1,表示即使有多个进程阻塞在同一个socket上,也只唤醒一个进程。
* 其作用是为了避免”惊群“。
*/
__wake_up_common_lock(wq_head, mode, 1, WF_SYNC, key);
}
static void __wake_up_common_lock(struct wait_queue_head *wq_head, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
unsigned long flags;
wait_queue_entry_t bookmark;
bookmark.flags = 0;
bookmark.private = NULL;
bookmark.func = NULL;
INIT_LIST_HEAD(&bookmark.entry);
do {
spin_lock_irqsave(&wq_head->lock, flags);
// 实现进程唤醒
nr_exclusive = __wake_up_common(wq_head, mode, nr_exclusive,
wake_flags, key, &bookmark);
spin_unlock_irqrestore(&wq_head->lock, flags);
} while (bookmark.flags & WQ_FLAG_BOOKMARK);
}
static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
int nr_exclusive, int wake_flags, void *key,
wait_queue_entry_t *bookmark)
{
wait_queue_entry_t *curr, *next;
int cnt = 0;
lockdep_assert_held(&wq_head->lock);
if (bookmark && (bookmark->flags & WQ_FLAG_BOOKMARK)) {
curr = list_next_entry(bookmark, entry);
list_del(&bookmark->entry);
bookmark->flags = 0;
} else
curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry);
if (&curr->entry == &wq_head->head)
return nr_exclusive;
// 遍历等待队列项
list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
unsigned flags = curr->flags;
int ret;
if (flags & WQ_FLAG_BOOKMARK)
continue;
// 调用回调函数
ret = curr->func(curr, mode, wake_flags, key);
if (ret < 0)
break;
if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
if (bookmark && (++cnt > WAITQUEUE_WALK_BREAK_CNT) &&
(&next->entry != &wq_head->head)) {
bookmark->flags = WQ_FLAG_BOOKMARK;
list_add_tail(&bookmark->entry, &next->entry);
break;
}
}
return nr_exclusive;
}
在__wake_up_common函数中,会遍历sock对象的等待项,并调用每一项设置的回调。上面阻塞进程时,设置的回调函数是woken_wake_function。
int woken_wake_function(struct wait_queue_entry *wq_entry, unsigned mode, int sync, void *key)
{
/* Pairs with the smp_store_mb() in wait_woken(). */
smp_mb(); /* C */
wq_entry->flags |= WQ_FLAG_WOKEN;
return default_wake_function(wq_entry, mode, sync, key);
}
在该函数中,会调用到线程调度相关的函数。
int default_wake_function(wait_queue_entry_t *curr, unsigned mode, int wake_flags,
void *key)
{
WARN_ON_ONCE(IS_ENABLED(CONFIG_SCHED_DEBUG) && wake_flags & ~WF_SYNC);
return try_to_wake_up(curr->private, mode, wake_flags);
}
真正的实现在try_to_wake_up函数中,这个函数比较长,主要的逻辑是重新为进程选择一个CPU,然后获取对应的runq,并将进程放入进去,从而该进程就可以被调度了。
static int
try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags)
{
unsigned long flags;
int cpu, success = 0;
// 选择一个合适的CPU
cpu = select_task_rq(p, p->wake_cpu, wake_flags | WF_TTWU);
if (task_cpu(p) != cpu) {
if (p->in_iowait) {
delayacct_blkio_end(p);
atomic_dec(&task_rq(p)->nr_iowait);
}
wake_flags |= WF_MIGRATED;
psi_ttwu_dequeue(p);
// 为进程设置CPU
set_task_cpu(p, cpu);
}
#else
cpu = task_cpu(p);
#endif /* CONFIG_SMP */
// 将任务添加到CPU运行队列中
ttwu_queue(p, cpu, wake_flags);
unlock:
raw_spin_unlock_irqrestore(&p->pi_lock, flags);
out:
if (success)
ttwu_stat(p, task_cpu(p), wake_flags);
preempt_enable();
return success;
}
总结
虽然本文明面上是在介绍recv系统调用,但实际上更多的是在介绍进程的阻塞和唤醒。阻塞的过程就是从所在的CPU的runq中取出来,如果是限时阻塞,则设置一个timer,时间一到就重新加入进去。唤醒的过程就是重新选择一个CPU,并将进程添加到它的runq中去。