Linux中的recv系统调用

在Linux中,不管是在服务端,还是在客户端,都会通过调用recv或recvfrom系统调用来获取对方发送来的数据,如果有数据,那么就读取并交给用户线程来处理,但如果没有数据线程则会阻塞。本文就来分析一下recv系统调用的执行过程。


系统调用层

v5.19.17
recv
recvfrom
<
>
c
net/socket.c
SYSCALL_DEFINE4(recv, int, fd, void __user *, ubuf, size_t, size,
		unsigned int, flags)
{
	return __sys_recvfrom(fd, ubuf, size, flags, NULL, NULL);
}
c
net/socket.c
SYSCALL_DEFINE6(recvfrom, int, fd, void __user *, ubuf, size_t, size,
		unsigned int, flags, struct sockaddr __user *, addr,
		int __user *, addr_len)
{
	return __sys_recvfrom(fd, ubuf, size, flags, addr, addr_len);
}

可以看到两个系统调用调用的是同一个函数,只是参数不同而已,一般用recv比较多。

v5.19.17
c
net/socket.c
int __sys_recvfrom(int fd, void __user *ubuf, size_t size, unsigned int flags,
		   struct sockaddr __user *addr, int __user *addr_len)
{
	struct sockaddr_storage address;
	struct msghdr msg = {
		/* Save some cycles and don't copy the address if not needed */
		.msg_name = addr ? (struct sockaddr *)&address : NULL,
	};
	struct socket *sock;
	struct iovec iov;
	int err, err2;
	int fput_needed;

	err = import_single_range(READ, ubuf, size, &iov, &msg.msg_iter);
	if (unlikely(err))
		return err;
	// 根据传入的fd找到socket对象
	sock = sockfd_lookup_light(fd, &err, &fput_needed);
	if (!sock)
		goto out;

	if (sock->file->f_flags & O_NONBLOCK)
		flags |= MSG_DONTWAIT;
	// 接收消息
	err = sock_recvmsg(sock, &msg, flags);

	if (err >= 0 && addr != NULL) {
		err2 = move_addr_to_user(&address,
					 msg.msg_namelen, addr, addr_len);
		if (err2 < 0)
			err = err2;
	}

	fput_light(sock->file, fput_needed);
out:
	return err;
}
int sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags)
{
	int err = security_socket_recvmsg(sock, msg, msg_data_left(msg), flags);

	// 接收消息
	return err ?: sock_recvmsg_nosec(sock, msg, flags);
}
static inline int sock_recvmsg_nosec(struct socket *sock, struct msghdr *msg,
				     int flags)
{
	// 调用socket对象中ops的recvmsg函数
	return INDIRECT_CALL_INET(sock->ops->recvmsg, inet6_recvmsg,
				  inet_recvmsg, sock, msg, msg_data_left(msg),
				  flags);
}

网络连接会被抽象为一个文件,所以该函数中先通过fd查找socket对象,然后再调用sock_recvmsg函数来接收消息。本文仅分析IPV4,所以最后会调用inet_recvmsg函数。

v5.19.17
c
net/ipv4/af_inet.c
int inet_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
		 int flags)
{
	struct sock *sk = sock->sk;
	int addr_len = 0;
	int err;

	if (likely(!(flags & MSG_ERRQUEUE)))
		sock_rps_record_flow(sk);

	// 根据协议是tcp还是udp来调用对应的recvmsg函数
	err = INDIRECT_CALL_2(sk->sk_prot->recvmsg, tcp_recvmsg, udp_recvmsg,
			      sk, msg, size, flags, &addr_len);
	if (err >= 0)
		msg->msg_namelen = addr_len;
	return err;
}

在该函数中,针对是TCP还是UDP调用了不同的方法,本文以最常用的TCP为例来分析。

TCP层

v5.19.17
c
net/ipv4/tcp.c
/*
 * 从sk的接收队列中拷贝数据到用户进程的内存空间中,
 * msghdr保存了用户空间的缓冲区信息,需要拷贝数据到这些缓冲区中
 */
int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int flags,
		int *addr_len)
{
	int cmsg_flags = 0, ret;
	struct scm_timestamping_internal tss;

	if (unlikely(flags & MSG_ERRQUEUE))
		return inet_recv_error(sk, msg, len, addr_len);

	if (sk_can_busy_loop(sk) &&
	    skb_queue_empty_lockless(&sk->sk_receive_queue) &&
	    sk->sk_state == TCP_ESTABLISHED)
		sk_busy_loop(sk, flags & MSG_DONTWAIT);

	/*
	 * 锁住sk,避免其他进程也来读该sk,以及软中断线程往sk的队列中写数据。
	 */
	lock_sock(sk);
	// 接收数据-为应用进程拷贝数据
	ret = tcp_recvmsg_locked(sk, msg, len, flags, &tss, &cmsg_flags);
	// 释放锁
	release_sock(sk);

	if ((cmsg_flags || msg->msg_get_inq) && ret >= 0) {
		if (cmsg_flags & TCP_CMSG_TS)
			tcp_recv_timestamp(msg, sk, &tss);
		if (msg->msg_get_inq) {
			msg->msg_inq = tcp_inq_hint(sk);
			if (cmsg_flags & TCP_CMSG_INQ)
				put_cmsg(msg, SOL_TCP, TCP_CM_INQ,
					 sizeof(msg->msg_inq), &msg->msg_inq);
		}
	}
	return ret;
}

该函数主要是做并发控制,防止多个线程读同一个socket,真正的读取操作是在tcp_recvmsg_locked函数中。

v5.19.17
c
net/ipv4/tcp.c
static int tcp_recvmsg_locked(struct sock *sk, struct msghdr *msg, size_t len,
			      int flags, struct scm_timestamping_internal *tss,
			      int *cmsg_flags)
{
	struct tcp_sock *tp = tcp_sk(sk);
	// 记录实际拷贝的数据的大小
	int copied = 0;
	u32 peek_seq;
	u32 *seq;
	unsigned long used;
	int err;
	int target;		/* Read at least this many bytes */
	long timeo;
	struct sk_buff *skb, *last;
	u32 urg_hole = 0;

	err = -ENOTCONN;
	// 还在监听状态,说明接收不了数据
	if (sk->sk_state == TCP_LISTEN)
		goto out;

	if (tp->recvmsg_inq) {
		*cmsg_flags = TCP_CMSG_INQ;
		msg->msg_get_inq = 1;
	}
	// 获取读数据的超时时间,如果标记中设置了MSG_DONTWAIT,说明是非阻塞模式
	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);


	// 获取tcp_sock结构体中下一个拷贝字节序字��的地址
	seq = &tp->copied_seq;
	// MSG_PEEK是指只读数据,不修改copied_seq字段
	if (flags & MSG_PEEK) {
		peek_seq = tp->copied_seq;
		seq = &peek_seq;
	}

	// 获取需要拷贝的字节数
	target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);

	/*
	 * 在循环中拷贝数据,直到len等于0,当然循环过程中有break的情况。
	 */
	do {
		u32 offset;


		/* Next get a buffer. */

		// 从receive队列末尾取数据
		last = skb_peek_tail(&sk->sk_receive_queue);
		// 遍历接收队列中的数据
		skb_queue_walk(&sk->sk_receive_queue, skb) {
			// 这里紧接着就对last进行赋值,那么上面两行的赋值有什么意义?
			last = skb;
			/* Now that we have two receive queues this
			 * shouldn't happen.
			 */
			// 出错的情况
			if (WARN(before(*seq, TCP_SKB_CB(skb)->seq),
				 "TCP recvmsg seq # bug: copied %X, seq %X, rcvnxt %X, fl %X\n",
				 *seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt,
				 flags))
				break;

			// 获取当前skb中已经拷贝的字节数,通过上次拷贝的序号减去当前skb的起始序列号
			offset = *seq - TCP_SKB_CB(skb)->seq;
			// 如果TCP标��中包含SYN
			if (unlikely(TCP_SKB_CB(skb)->tcp_flags & TCPHDR_SYN)) {
				pr_err_once("%s: found a SYN, please report !\n", __func__);
				offset--;
			}
			/*
			 * 如果当前skb还没被拷贝完
			 * 注意这里不需要小于等于
			 */
			if (offset < skb->len)
				// 找到第一个需要拷贝的skb
				goto found_ok_skb;
			// 如果skb中包含FIN标记
			if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
				goto found_fin_ok;
			WARN(!(flags & MSG_PEEK),
			     "TCP recvmsg seq # bug 2: copied %X, seq %X, rcvnxt %X, fl %X\n",
			     *seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt, flags);
		}

		/* Well, if we have backlog, try to process it now yet. */

		// 如果已经拷贝到足够(target)的数据,而且backlog队列中已经没有节点了,则跳出循环
		if (copied >= target && !READ_ONCE(sk->sk_backlog.tail))
			break;


		// 如果已经拷贝的长度大于等于了目标长度,那么
		if (copied >= target) {
			/* Do not sleep, just process backlog. */
			__sk_flush_backlog(sk);
		} else { // 没有收到足够的数据,
			tcp_cleanup_rbuf(sk, copied);
			// 阻塞当前进程,阻塞时间由timeo决定
			sk_wait_data(sk, &timeo, last);
		}

		if ((flags & MSG_PEEK) &&
		    (peek_seq - copied - urg_hole != tp->copied_seq)) {
			net_dbg_ratelimited("TCP(%s:%d): Application bug, race in MSG_PEEK\n",
					    current->comm,
					    task_pid_nr(current));
			peek_seq = tp->copied_seq;
		}
		continue;

found_ok_skb:
		/* Ok so how much can we use? */
		// 计算还需要拷贝的字节数
		used = skb->len - offset;
		if (len < used)
			used = len;


		// MSG_TRUNC表示超出缓冲区
		if (!(flags & MSG_TRUNC)) {
			// 从skb中拷贝数据到msg中,offset是在skb中的偏移,而used则是需要拷贝的字节数
			err = skb_copy_datagram_msg(skb, offset, msg, used);
			if (err) {
				/* Exception. Bailout! */
				if (!copied)
					copied = -EFAULT;
				break;
			}
		}

		/*
		 * 计算并修改下一次拷贝期望的序列号,注意这里的修改是会反应到tcp_seq结构体中的copied_seq字段的,
		 * 因为上面的seq获取的是字段的地址
		 */
		WRITE_ONCE(*seq, *seq + used);
		// 累积已拷贝的数据长度
		copied += used;
		// 计算最新还需要拷贝的长度
		len -= used;

		tcp_rcv_space_adjust(sk);

	} while (len > 0);

	/* According to UNIX98, msg_name/msg_namelen are ignored
	 * on connected socket. I was just happy when found this 8) --ANK
	 */

	/* Clean up data we have read: This will do ACK frames. */
	tcp_cleanup_rbuf(sk, copied);
	return copied;

out:
	return err;

recv_urg:
	err = tcp_recv_urg(sk, msg, len, flags);
	goto out;

recv_sndq:
	err = tcp_peek_sndq(sk, msg, len);
	goto out;
}

该函数非常长,主要的逻辑在do...while循环中,会调用函数skb_copy_datagram_msg将数据从skb拷贝到用户传递下来的msg对象中。循环条件中的len就是上次用户传下来的所要读取的数据长度,直到读取到足够的数据才会返回。如果没有接收到足够的数据会做什么呢?如果没有数据,则当前线程会调用sk_wait_data函数并主动进入等待状态。

阻塞实现过程

v5.19.17
sk_wait_data
DEFINE_WAIT_FUNC
<
>
c
net/core/sock.c
int sk_wait_data(struct sock *sk, long *timeo, const struct sk_buff *skb)
{
	/*
	 * 创建一个名为wait的等待队列项,并注册回调函数woken_wake_function,
	 * 并把当前进程描述符current关联到其.private成员上。
	 */
	DEFINE_WAIT_FUNC(wait, woken_wake_function);
	int rc;

	/*
	 * 先调用sk_sleep函数获取sock对象下的等待队列列表头wait_queue_head,
	 * 然后调用add_wait_queue将新创建的wait这个等待队列项插入到sock对象的等待队列中。
	 *
	 * 后续等该socket上的数据就绪时,内核可以查找socket等待队列上的等待项,
	 * 进而可以找到回调函数和在等待该socket就绪事件的进程。
	 */
	add_wait_queue(sk_sleep(sk), &wait);
	sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
	// 让出CPU,进程将进入睡眠状态。
	rc = sk_wait_event(sk, timeo, skb_peek_tail(&sk->sk_receive_queue) != skb, &wait);
	sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
	remove_wait_queue(sk_sleep(sk), &wait);
	return rc;
}
c
include/linux/wait.h
#define DEFINE_WAIT_FUNC(name, function)					\
	struct wait_queue_entry name = {					\
		.private	= current,					\
		.func		= function,					\
		.entry		= LIST_HEAD_INIT((name).entry),			\
	}

该函数主要就是3个核心步骤:

  • 创建等待项:封装了当前线程和回调函数;
  • 将等待项添加到sock对象的等待队列中;
  • 最后主动让出CPU;

进入阻塞

v5.19.17
c
include/net/sock.h
#define sk_wait_event(__sk, __timeo, __condition, __wait)		\
	({	int __rc;                                              \
        /* 让出CPU前,释放锁 */                                \
		release_sock(__sk);					\
		__rc = __condition;					\
		if (!__rc) {						\
			*(__timeo) = wait_woken(__wait,			\
						TASK_INTERRUPTIBLE,	\
						*(__timeo));		\
		}							\
		sched_annotate_sleep();      \
		/* 再次获取CPU,重新上锁 */  	\
		lock_sock(__sk);					\
		__rc = __condition;					\
		__rc;							\
	})                                                        \

在阻塞前后会分别释放和获取sock的锁,防止同时多个线程操作同一个sock对象。关键在于调用的wait_woken函数。

v5.19.17
c
kernel/sched/wait.c
long wait_woken(struct wait_queue_entry *wq_entry, unsigned mode, long timeout)
{
	/*
	 * The below executes an smp_mb(), which matches with the full barrier
	 * executed by the try_to_wake_up() in woken_wake_function() such that
	 * either we see the store to wq_entry->flags in woken_wake_function()
	 * or woken_wake_function() sees our store to current->state.
	 */
	set_current_state(mode); /* A */
	if (!(wq_entry->flags & WQ_FLAG_WOKEN) && !is_kthread_should_stop())
        // 等待最多timeout这么久
        // 具体实现在kernel/time/timer.c文件中
		timeout = schedule_timeout(timeout);
    // 重新将进程状态设置为RUNNING 
	__set_current_state(TASK_RUNNING);

	/*
	 * The below executes an smp_mb(), which matches with the smp_mb() (C)
	 * in woken_wake_function() such that either we see the wait condition
	 * being true or the store to wq_entry->flags in woken_wake_function()
	 * follows ours in the coherence order.
	 */
	smp_store_mb(wq_entry->flags, wq_entry->flags & ~WQ_FLAG_WOKEN); /* B */

	return timeout;
}

该函数中主要调用的是schedule_timeout函数来实现阻塞的。最后会调用到kernel/sched/core.c文件中的__schedule函数,在该函数中,会将当前线程从当前CPU的runq中移除,并从中获取下一个任务,然后进行实现上下文切换。由于是限时阻塞,所以会使用一个timer来控制阻塞的时间,时间一到timer就会重新放入CPU的runq中。这里的时间是sock对象的最大接收时间,在上面的tcp_recvmsg_locked函数中会获取,并一路传下来。

唤醒进程

有阻塞就有唤醒,在软中断线程接收到数据时,会执行tcp_rcv_established函数,会唤醒阻塞在sock对象上的进程。

c
void tcp_rcv_established(struct sock *sk, struct sk_buff *skb)
{
	...
	tcp_data_ready(sk);
	...
}
v5.19.17
c
net/ipv4/tcp_input.c
void tcp_data_ready(struct sock *sk)
{
	if (tcp_epollin_ready(sk, sk->sk_rcvlowat) || sock_flag(sk, SOCK_DONE))
		// 数据已准备好,唤醒阻塞在socket上的进程
		sk->sk_data_ready(sk);
}

sock对象的sk_data_ready函数是什么?这样回到创建sock对象的时候,在介绍socket系统调用时,讲到在net/ipv4/af_inet.c文件中的inet_create函数中会创建sock对象并初始化它,就会设置它的sk_data_ready函数,具体设置的是sock_def_readable函数。

v5.19.17
c
net/core/sock.c
void sock_def_readable(struct sock *sk)
{
	struct socket_wq *wq;

	rcu_read_lock();
	wq = rcu_dereference(sk->sk_wq);
	// 如果有进程在此socket的等待队列中
	if (skwq_has_sleeper(wq))
		// 唤醒等待队列中的进程
		wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI |
						EPOLLRDNORM | EPOLLRDBAND);
	sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
	rcu_read_unlock();
}

在该函数中,会唤醒阻塞在该sock对象上的进程。

v5.19.17
wake_up_interruptible_sync_poll
__wake_up_sync_key
<
>
c
include/linux/wait.h
#define wake_up_interruptible_sync_poll(x, m)					\
	__wake_up_sync_key((x), TASK_INTERRUPTIBLE, poll_to_key(m))
c
kernel/sched/wait.c
void __wake_up_sync_key(struct wait_queue_head *wq_head, unsigned int mode,
			void *key)
{
	if (unlikely(!wq_head))
		return;

	/*
	 * 唤醒进程,
	 * 这里传入的nr_exclusive是1,表示即使有多个进程阻塞在同一个socket上,也只唤醒一个进程。
	 * 其作用是为了避免”惊群“。
	 */
	__wake_up_common_lock(wq_head, mode, 1, WF_SYNC, key);
}
static void __wake_up_common_lock(struct wait_queue_head *wq_head, unsigned int mode,
			int nr_exclusive, int wake_flags, void *key)
{
	unsigned long flags;
	wait_queue_entry_t bookmark;

	bookmark.flags = 0;
	bookmark.private = NULL;
	bookmark.func = NULL;
	INIT_LIST_HEAD(&bookmark.entry);

	do {
		spin_lock_irqsave(&wq_head->lock, flags);
		// 实现进程唤醒
		nr_exclusive = __wake_up_common(wq_head, mode, nr_exclusive,
						wake_flags, key, &bookmark);
		spin_unlock_irqrestore(&wq_head->lock, flags);
	} while (bookmark.flags & WQ_FLAG_BOOKMARK);
}
static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
			int nr_exclusive, int wake_flags, void *key,
			wait_queue_entry_t *bookmark)
{
	wait_queue_entry_t *curr, *next;
	int cnt = 0;

	lockdep_assert_held(&wq_head->lock);

	if (bookmark && (bookmark->flags & WQ_FLAG_BOOKMARK)) {
		curr = list_next_entry(bookmark, entry);

		list_del(&bookmark->entry);
		bookmark->flags = 0;
	} else
		curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry);

	if (&curr->entry == &wq_head->head)
		return nr_exclusive;

    // 遍历等待队列项
	list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
		unsigned flags = curr->flags;
		int ret;

		if (flags & WQ_FLAG_BOOKMARK)

			continue;

        // 调用回调函数
		ret = curr->func(curr, mode, wake_flags, key);
		if (ret < 0)
			break;
		if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
			break;

		if (bookmark && (++cnt > WAITQUEUE_WALK_BREAK_CNT) &&
				(&next->entry != &wq_head->head)) {
			bookmark->flags = WQ_FLAG_BOOKMARK;
			list_add_tail(&bookmark->entry, &next->entry);
			break;
		}
	}

	return nr_exclusive;
}

__wake_up_common函数中,会遍历sock对象的等待项,并调用每一项设置的回调。上面阻塞进程时,设置的回调函数是woken_wake_function

v5.19.17
c
kernel/sched/wait.c
int woken_wake_function(struct wait_queue_entry *wq_entry, unsigned mode, int sync, void *key)
{
	/* Pairs with the smp_store_mb() in wait_woken(). */
	smp_mb(); /* C */
	wq_entry->flags |= WQ_FLAG_WOKEN;

	return default_wake_function(wq_entry, mode, sync, key);
}

在该函数中,会调用到线程调度相关的函数。

v5.19.17
c
kernel/sched/core.c
int default_wake_function(wait_queue_entry_t *curr, unsigned mode, int wake_flags,
			  void *key)
{
	WARN_ON_ONCE(IS_ENABLED(CONFIG_SCHED_DEBUG) && wake_flags & ~WF_SYNC);
	return try_to_wake_up(curr->private, mode, wake_flags);
}

真正的实现在try_to_wake_up函数中,这个函数比较长,主要的逻辑是重新为进程选择一个CPU,然后获取对应的runq,并将进程放入进去,从而该进程就可以被调度了。

v5.19.17
c
kernel/sched/core.c
static int
try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags)
{
	unsigned long flags;
	int cpu, success = 0;


	// 选择一个合适的CPU
	cpu = select_task_rq(p, p->wake_cpu, wake_flags | WF_TTWU);
	if (task_cpu(p) != cpu) {
		if (p->in_iowait) {
			delayacct_blkio_end(p);
			atomic_dec(&task_rq(p)->nr_iowait);
		}

		wake_flags |= WF_MIGRATED;
		psi_ttwu_dequeue(p);
		// 为进程设置CPU
		set_task_cpu(p, cpu);
	}
#else
	cpu = task_cpu(p);
#endif /* CONFIG_SMP */

	// 将任务添加到CPU运行队列中
	ttwu_queue(p, cpu, wake_flags);
unlock:
	raw_spin_unlock_irqrestore(&p->pi_lock, flags);
out:
	if (success)
		ttwu_stat(p, task_cpu(p), wake_flags);
	preempt_enable();

	return success;
}

总结

虽然本文明面上是在介绍recv系统调用,但实际上更多的是在介绍进程的阻塞和唤醒。阻塞的过程就是从所在的CPU的runq中取出来,如果是限时阻塞,则设置一个timer,时间一到就重新加入进去。唤醒的过程就是重新选择一个CPU,并将进程添加到它的runq中去。