我是靠谱客的博主 野性寒风,最近开发中收集的这篇文章主要介绍linux内核协议栈 UDP之数据报接收过程Ⅱ1 系统调用 udp_recvmsg()2 后备队列 sk_backlog 中的skb处理 release_sock(),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

目录

1 系统调用 udp_recvmsg()

1.1 从接收队列 sk_receive_queue 中获取skb

1.1.1 获取队列头不删除 skb_peek()

1.1.2 将 skb 从移除队列中 __skb_unlink()

1.2 尝试释放skb内存 skb_free_datagram_locked()

2 后备队列 sk_backlog 中的skb处理 release_sock()

2.1 后备队列skb进入接收队列 sk_backlog_rcv()


1 系统调用 udp_recvmsg()

对于应用程序而言,读操作可以通过多个系统调用实现,如read()、recv()、recvfrom()等等,但是这些系统调用到了传输层协议,都调用到了同一接口,对于UDP就是udp_recvmsg()。

int udp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
		size_t len, int noblock, int flags, int *addr_len)
{
	struct inet_sock *inet = inet_sk(sk);
	struct sockaddr_in *sin = (struct sockaddr_in *)msg->msg_name;
	struct sk_buff *skb;
	unsigned int ulen, copied;
	int peeked;
	int err;
	int is_udplite = IS_UDPLITE(sk);

	//需要返回源地址信息,设置源地址长度
	if (addr_len)
		*addr_len = sizeof(*sin);

	//如果设置了MSG_ERRQUEUE标记,那么只读取错误信息
	if (flags & MSG_ERRQUEUE)
		return ip_recv_error(sk, msg, len);

try_again:
	//根据是否需要阻塞,从接收队列中取出一个SKB
	skb = __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
				  &peeked, &err);
	if (!skb)
		goto out;

	//ulen为该SKB中包含的应用数据长度
	ulen = skb->len - sizeof(struct udphdr);
	//len为应用程序指定的buffer大小,所以下面的逻辑含义为:
	//1. 如果应用提供的buffer超过了该数据包的数据长度,那么调整要拷贝的数据量为该SKB中实际数据量
	//2. 如果应用提供的buffer不够大,那么需要截断数据包,设置截断标记
	copied = len;
	if (copied > ulen)
		copied = ulen;
	else if (copied < ulen)
		msg->msg_flags |= MSG_TRUNC;

	/*
	 * If checksum is needed at all, try to do it while copying the
	 * data.  If the data is truncated, or if we only want a partial
	 * coverage checksum (UDP-Lite), do it before the copy.
	 */
	//条件一:对于截断的数据包和尚未完成校验的数据包,先进行校验,校验出错则尝试读取下一个数据包
	//条件二:实际上只用于UDPLite,因为UDP协议的校验在接收过程的第一步就完成了
	if (copied < ulen || UDP_SKB_CB(skb)->partial_cov) {
		if (udp_lib_checksum_complete(skb))
			goto csum_copy_err;
	}
	//根据是否需要校验,调用不同的数据拷贝函数
	if (skb_csum_unnecessary(skb))
		err = skb_copy_datagram_iovec(skb, sizeof(struct udphdr),
					      msg->msg_iov, copied);
	else {
	    //在数据拷贝过程中还会进行校验
		err = skb_copy_and_csum_datagram_iovec(skb,
						       sizeof(struct udphdr),
						       msg->msg_iov);
		if (err == -EINVAL)
			goto csum_copy_err;
	}
	//数据拷贝失败,返回错误
	if (err)
		goto out_free;
	//只有非PEEK读取才更新统计信息
	if (!peeked)
		UDP_INC_STATS_USER(sock_net(sk), UDP_MIB_INDATAGRAMS, is_udplite);
	//更新数据包接收的时间到sk->sk_stamp中
	sock_recv_timestamp(msg, sk, skb);

	//拷贝数据包源地址信息,该地址会返回给应用程序
	if (sin) {
		sin->sin_family = AF_INET;
		sin->sin_port = udp_hdr(skb)->source;
		sin->sin_addr.s_addr = ip_hdr(skb)->saddr;
		memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
	}
	//获取控制信息
	if (inet->cmsg_flags)
		ip_cmsg_recv(msg, skb);

	//读取成功,返回值err表示的是已经读取到的字节数
	err = copied;
	if (flags & MSG_TRUNC)
		err = ulen;

out_free:
	//释放该SKB的数据
	skb_free_datagram_locked(sk, skb);
out:
	return err;

csum_copy_err:
	lock_sock(sk);
	if (!skb_kill_datagram(sk, skb, flags))
		UDP_INC_STATS_USER(sock_net(sk), UDP_MIB_INERRORS, is_udplite);
	release_sock(sk);

	if (noblock)
		return -EAGAIN;
	goto try_again;
}

1.1 从接收队列 sk_receive_queue 中获取skb

/**
 *	__skb_recv_datagram - Receive a datagram skbuff
 *	@sk: socket
 *	@flags: MSG_ flags
 *	@peeked: returns non-zero if this packet has been seen before
 *	@err: error code returned
 */
struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned flags,
				    int *peeked, int *err)
{
	struct sk_buff *skb;
	long timeo;

    //如果该socket遇到了错误,返回错误
	int error = sock_error(sk);
	if (error)
		goto no_packet;

	//根据是否设置了非阻塞标记,获取超时时间。对于非阻塞模式,timeo为0
	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);

	do {
		/* Again only user level code calls this function, so nothing
		 * interrupt level will suddenly eat the receive_queue.
		 *
		 * Look at current nfs client by the way...
		 * However, this function was corrent in any case. 8)
		 */
		unsigned long cpu_flags;
		//关中断并且持有接收队列的锁
		spin_lock_irqsave(&sk->sk_receive_queue.lock, cpu_flags);
		//获取接收队列中的第一个skb
		skb = skb_peek(&sk->sk_receive_queue);
		if (skb) {
			*peeked = skb->peeked;
			//如果设置了MSG_PEEK标记,那么设置skb的peek标记,并且增加对skb的引用计数,
			//该标记很重要,会影响是否释放该skb,见下文的总结
			if (flags & MSG_PEEK) {
				skb->peeked = 1;
				atomic_inc(&skb->users);
			} else
			    //非MSG_PEEK场景,将该skb从接收队列中移除
				__skb_unlink(skb, &sk->sk_receive_queue);
		}
		//释放接收队列锁并开启中断
		spin_unlock_irqrestore(&sk->sk_receive_queue.lock, cpu_flags);
		//找到了skb,直接返回
		if (skb)
			return skb;

        //当前接收队列为空,如果超时时间为0,即非阻塞模式,那么直接返回EAGAIN错误
		/* User doesn't want to wait */
		error = -EAGAIN;
		if (!timeo)
			goto no_packet;
		//没有可读数据,需要阻塞等待数据可用,阻塞在了sk->sk_sleep等待队列上
	} while (!wait_for_packet(sk, err, &timeo));

	return NULL;

no_packet:
	*err = error;
	return NULL;
}

1.1.1 获取队列头不删除 skb_peek()

/**
 *	skb_peek
 *	@list_: list to peek at
 *
 *	Peek an &sk_buff. Unlike most other operations you _MUST_
 *	be careful with this one. A peek leaves the buffer on the
 *	list and someone else may run off with it. You must hold
 *	the appropriate locks or have a private queue to do this.
 *
 *	Returns %NULL for an empty list or a pointer to the head element.
 *	The reference count is not incremented and the reference is therefore
 *	volatile. Use with caution.
 */
//如注释所述,使用该函数需要小心,保证不会有并发问题。这里是在持有锁的情况下操作的
static inline struct sk_buff *skb_peek(struct sk_buff_head *list_)
{
	//该函数会返回list中第一个skb的指针,但是并不会将该skb从队列中移除,这点很重要
	struct sk_buff *list = ((struct sk_buff *)list_)->next;
	if (list == (struct sk_buff *)list_)
		list = NULL;
	return list;
}

1.1.2 将 skb 从移除队列中 __skb_unlink()

/*
 * remove sk_buff from list. _Must_ be called atomically, and with
 * the list known..
 */
//将skb从队列list中移除,典型的链表操作
static inline void __skb_unlink(struct sk_buff *skb, struct sk_buff_head *list)
{
	struct sk_buff *next, *prev;

	list->qlen--;
	next	   = skb->next;
	prev	   = skb->prev;
	skb->next  = skb->prev = NULL;
	next->prev = prev;
	prev->next = next;
}

1.2 尝试释放skb内存 skb_free_datagram_locked()

该函数尝试释放SKB,但是要注意,是否真的会释放最终取决于SKB自己维护的引用计数。

/*
 * Read buffer destructor automatically called from kfree_skb.
 */
void sock_rfree(struct sk_buff *skb)
{
	struct sock *sk = skb->sk;
	//该SKB将被释放,所以递减传输控制块占用的内存记账
	atomic_sub(skb->truesize, &sk->sk_rmem_alloc);
	sk_mem_uncharge(skb->sk, skb->truesize);
}

void skb_free_datagram_locked(struct sock *sk, struct sk_buff *skb)
{
	//因为如果真的触发释放SKB,那么会调用skb->destructor()回调,在接收过程的第一步,找到
	//传输控制块后,使用skb_set_owner_r()将该skb的属主设置成了当前传输控制块,当时指定的
	//回调函数是sock_rfree(),在该函数中会操作传输控制块的成员,所以这里需要提前锁定
	lock_sock(sk);
	skb_free_datagram(sk, skb);
	release_sock(sk);
}

void skb_free_datagram(struct sock *sk, struct sk_buff *skb)
{
	consume_skb(skb);
	sk_mem_reclaim_partial(sk);
}

/**
 *	consume_skb - free an skbuff
 *	@skb: buffer to free
 *
 *	Drop a ref to the buffer and free it if the usage count has hit zero
 *	Functions identically to kfree_skb, but kfree_skb assumes that the frame
 *	is being dropped after a failure and notes that
 */
void consume_skb(struct sk_buff *skb)
{
	if (unlikely(!skb))
		return;
    //如果该skb的引用计数为1,那么需要真的释放
	if (likely(atomic_read(&skb->users) == 1))
		smp_rmb();
    //如果skb引用计数大于1,那么仅仅是将其引用计数减1
	else if (likely(!atomic_dec_and_test(&skb->users)))
		return;
	__kfree_skb(skb);
}

到此,回忆一下前面设置了MSG_PEEK的处理,对于此种情况,在调用__skb_recv_datagram()时并不会真的将skb从接收队列中移除,只是返回其指针,并且增加了对该skb的引用计数,所以在接收完毕后调用skb_free_datagram_locked()的时候,该skb的引用计数至少为2,并不会真正的释放。

2 后备队列 sk_backlog 中的skb处理 release_sock()

在《linux内核协议栈 UDP之数据报接收过程Ⅰ》中有提到,在软中断接收过程中,如果当前传输控制块刚好被进程上下文锁定,那么只是将数据放入到后备队列中,我们并没有介绍该队列中的数据又是如何被应用接收的。实际上,在进程上下文中调用release_sock()的时候会处理该后备队列,代码如下:

void release_sock(struct sock *sk)
{
	/*
	 * The sk_lock has mutex_unlock() semantics:
	 */
	mutex_release(&sk->sk_lock.dep_map, 1, _RET_IP_);

	spin_lock_bh(&sk->sk_lock.slock);
	//重点看这里,如果后备队列不为空,调用__release_sock()进行处理
	if (sk->sk_backlog.tail)
		__release_sock(sk);
	sk->sk_lock.owned = 0;
	if (waitqueue_active(&sk->sk_lock.wq))
		wake_up(&sk->sk_lock.wq);
	spin_unlock_bh(&sk->sk_lock.slock);
}

关于传输控制块的同步锁可以参考笔记《linux内核协议栈 套接口层之传输控制块同步锁socket_lock_t》

static void __release_sock(struct sock *sk)
{
	//获取后备队列第一个元素
	struct sk_buff *skb = sk->sk_backlog.head;

	do {
	    //这里先将后备队列清空,然后打开硬中断,但是软中没有打开。
	    //由于对数据包的处理比较耗时,这种处理方式可以提高系统性能
		sk->sk_backlog.head = sk->sk_backlog.tail = NULL;
		bh_unlock_sock(sk);

		//循环处理后备队列中数据包
		do {
			struct sk_buff *next = skb->next;

			skb->next = NULL;
			//处理该数据包
			sk_backlog_rcv(sk, skb);

			/*
			 * We are in process context here with softirqs
			 * disabled, use cond_resched_softirq() to preempt.
			 * This is safe to do because we've taken the backlog
			 * queue private:
			 */
			//重新调度一下下半部
			cond_resched_softirq();

			skb = next;
		} while (skb != NULL);

		//再次持锁,因为要判断传输控制块的后备队列是否为空。因为前面重新调度过软中断,
		//所以下面的外层循环可以保证能够处理新到来的数据包
		bh_lock_sock(sk);
	} while ((skb = sk->sk_backlog.head) != NULL);
}

2.1 后备队列skb进入接收队列 sk_backlog_rcv()

static inline int sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
{
	//对于UDP,该回调函数就是__udp_queue_rcv_skb(),
    //对于TCP,该回调函数则是 tcp_v4_do_rcv()
	//在软中断中就是使用该函数将数据包放入了接收队列
	return sk->sk_backlog_rcv(sk, skb);
}

最后

以上就是野性寒风为你收集整理的linux内核协议栈 UDP之数据报接收过程Ⅱ1 系统调用 udp_recvmsg()2 后备队列 sk_backlog 中的skb处理 release_sock()的全部内容,希望文章能够帮你解决linux内核协议栈 UDP之数据报接收过程Ⅱ1 系统调用 udp_recvmsg()2 后备队列 sk_backlog 中的skb处理 release_sock()所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(65)

评论列表共有 0 条评论

立即
投稿
返回
顶部