概述
目录
1 系统调用 udp_recvmsg()
1.1 从接收队列 sk_receive_queue 中获取skb
1.1.1 获取队列头不删除 skb_peek()
1.1.2 将 skb 从移除队列中 __skb_unlink()
1.2 尝试释放skb内存 skb_free_datagram_locked()
2 后备队列 sk_backlog 中的skb处理 release_sock()
2.1 后备队列skb进入接收队列 sk_backlog_rcv()
1 系统调用 udp_recvmsg()
对于应用程序而言,读操作可以通过多个系统调用实现,如read()、recv()、recvfrom()等等,但是这些系统调用到了传输层协议,都调用到了同一接口,对于UDP就是udp_recvmsg()。
int udp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
size_t len, int noblock, int flags, int *addr_len)
{
struct inet_sock *inet = inet_sk(sk);
struct sockaddr_in *sin = (struct sockaddr_in *)msg->msg_name;
struct sk_buff *skb;
unsigned int ulen, copied;
int peeked;
int err;
int is_udplite = IS_UDPLITE(sk);
//需要返回源地址信息,设置源地址长度
if (addr_len)
*addr_len = sizeof(*sin);
//如果设置了MSG_ERRQUEUE标记,那么只读取错误信息
if (flags & MSG_ERRQUEUE)
return ip_recv_error(sk, msg, len);
try_again:
//根据是否需要阻塞,从接收队列中取出一个SKB
skb = __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
&peeked, &err);
if (!skb)
goto out;
//ulen为该SKB中包含的应用数据长度
ulen = skb->len - sizeof(struct udphdr);
//len为应用程序指定的buffer大小,所以下面的逻辑含义为:
//1. 如果应用提供的buffer超过了该数据包的数据长度,那么调整要拷贝的数据量为该SKB中实际数据量
//2. 如果应用提供的buffer不够大,那么需要截断数据包,设置截断标记
copied = len;
if (copied > ulen)
copied = ulen;
else if (copied < ulen)
msg->msg_flags |= MSG_TRUNC;
/*
* If checksum is needed at all, try to do it while copying the
* data. If the data is truncated, or if we only want a partial
* coverage checksum (UDP-Lite), do it before the copy.
*/
//条件一:对于截断的数据包和尚未完成校验的数据包,先进行校验,校验出错则尝试读取下一个数据包
//条件二:实际上只用于UDPLite,因为UDP协议的校验在接收过程的第一步就完成了
if (copied < ulen || UDP_SKB_CB(skb)->partial_cov) {
if (udp_lib_checksum_complete(skb))
goto csum_copy_err;
}
//根据是否需要校验,调用不同的数据拷贝函数
if (skb_csum_unnecessary(skb))
err = skb_copy_datagram_iovec(skb, sizeof(struct udphdr),
msg->msg_iov, copied);
else {
//在数据拷贝过程中还会进行校验
err = skb_copy_and_csum_datagram_iovec(skb,
sizeof(struct udphdr),
msg->msg_iov);
if (err == -EINVAL)
goto csum_copy_err;
}
//数据拷贝失败,返回错误
if (err)
goto out_free;
//只有非PEEK读取才更新统计信息
if (!peeked)
UDP_INC_STATS_USER(sock_net(sk), UDP_MIB_INDATAGRAMS, is_udplite);
//更新数据包接收的时间到sk->sk_stamp中
sock_recv_timestamp(msg, sk, skb);
//拷贝数据包源地址信息,该地址会返回给应用程序
if (sin) {
sin->sin_family = AF_INET;
sin->sin_port = udp_hdr(skb)->source;
sin->sin_addr.s_addr = ip_hdr(skb)->saddr;
memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
}
//获取控制信息
if (inet->cmsg_flags)
ip_cmsg_recv(msg, skb);
//读取成功,返回值err表示的是已经读取到的字节数
err = copied;
if (flags & MSG_TRUNC)
err = ulen;
out_free:
//释放该SKB的数据
skb_free_datagram_locked(sk, skb);
out:
return err;
csum_copy_err:
lock_sock(sk);
if (!skb_kill_datagram(sk, skb, flags))
UDP_INC_STATS_USER(sock_net(sk), UDP_MIB_INERRORS, is_udplite);
release_sock(sk);
if (noblock)
return -EAGAIN;
goto try_again;
}
1.1 从接收队列 sk_receive_queue 中获取skb
/**
* __skb_recv_datagram - Receive a datagram skbuff
* @sk: socket
* @flags: MSG_ flags
* @peeked: returns non-zero if this packet has been seen before
* @err: error code returned
*/
struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned flags,
int *peeked, int *err)
{
struct sk_buff *skb;
long timeo;
//如果该socket遇到了错误,返回错误
int error = sock_error(sk);
if (error)
goto no_packet;
//根据是否设置了非阻塞标记,获取超时时间。对于非阻塞模式,timeo为0
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
do {
/* Again only user level code calls this function, so nothing
* interrupt level will suddenly eat the receive_queue.
*
* Look at current nfs client by the way...
* However, this function was corrent in any case. 8)
*/
unsigned long cpu_flags;
//关中断并且持有接收队列的锁
spin_lock_irqsave(&sk->sk_receive_queue.lock, cpu_flags);
//获取接收队列中的第一个skb
skb = skb_peek(&sk->sk_receive_queue);
if (skb) {
*peeked = skb->peeked;
//如果设置了MSG_PEEK标记,那么设置skb的peek标记,并且增加对skb的引用计数,
//该标记很重要,会影响是否释放该skb,见下文的总结
if (flags & MSG_PEEK) {
skb->peeked = 1;
atomic_inc(&skb->users);
} else
//非MSG_PEEK场景,将该skb从接收队列中移除
__skb_unlink(skb, &sk->sk_receive_queue);
}
//释放接收队列锁并开启中断
spin_unlock_irqrestore(&sk->sk_receive_queue.lock, cpu_flags);
//找到了skb,直接返回
if (skb)
return skb;
//当前接收队列为空,如果超时时间为0,即非阻塞模式,那么直接返回EAGAIN错误
/* User doesn't want to wait */
error = -EAGAIN;
if (!timeo)
goto no_packet;
//没有可读数据,需要阻塞等待数据可用,阻塞在了sk->sk_sleep等待队列上
} while (!wait_for_packet(sk, err, &timeo));
return NULL;
no_packet:
*err = error;
return NULL;
}
1.1.1 获取队列头不删除 skb_peek()
/**
* skb_peek
* @list_: list to peek at
*
* Peek an &sk_buff. Unlike most other operations you _MUST_
* be careful with this one. A peek leaves the buffer on the
* list and someone else may run off with it. You must hold
* the appropriate locks or have a private queue to do this.
*
* Returns %NULL for an empty list or a pointer to the head element.
* The reference count is not incremented and the reference is therefore
* volatile. Use with caution.
*/
//如注释所述,使用该函数需要小心,保证不会有并发问题。这里是在持有锁的情况下操作的
static inline struct sk_buff *skb_peek(struct sk_buff_head *list_)
{
//该函数会返回list中第一个skb的指针,但是并不会将该skb从队列中移除,这点很重要
struct sk_buff *list = ((struct sk_buff *)list_)->next;
if (list == (struct sk_buff *)list_)
list = NULL;
return list;
}
1.1.2 将 skb 从移除队列中 __skb_unlink()
/*
* remove sk_buff from list. _Must_ be called atomically, and with
* the list known..
*/
//将skb从队列list中移除,典型的链表操作
static inline void __skb_unlink(struct sk_buff *skb, struct sk_buff_head *list)
{
struct sk_buff *next, *prev;
list->qlen--;
next = skb->next;
prev = skb->prev;
skb->next = skb->prev = NULL;
next->prev = prev;
prev->next = next;
}
1.2 尝试释放skb内存 skb_free_datagram_locked()
该函数尝试释放SKB,但是要注意,是否真的会释放最终取决于SKB自己维护的引用计数。
/*
* Read buffer destructor automatically called from kfree_skb.
*/
void sock_rfree(struct sk_buff *skb)
{
struct sock *sk = skb->sk;
//该SKB将被释放,所以递减传输控制块占用的内存记账
atomic_sub(skb->truesize, &sk->sk_rmem_alloc);
sk_mem_uncharge(skb->sk, skb->truesize);
}
void skb_free_datagram_locked(struct sock *sk, struct sk_buff *skb)
{
//因为如果真的触发释放SKB,那么会调用skb->destructor()回调,在接收过程的第一步,找到
//传输控制块后,使用skb_set_owner_r()将该skb的属主设置成了当前传输控制块,当时指定的
//回调函数是sock_rfree(),在该函数中会操作传输控制块的成员,所以这里需要提前锁定
lock_sock(sk);
skb_free_datagram(sk, skb);
release_sock(sk);
}
void skb_free_datagram(struct sock *sk, struct sk_buff *skb)
{
consume_skb(skb);
sk_mem_reclaim_partial(sk);
}
/**
* consume_skb - free an skbuff
* @skb: buffer to free
*
* Drop a ref to the buffer and free it if the usage count has hit zero
* Functions identically to kfree_skb, but kfree_skb assumes that the frame
* is being dropped after a failure and notes that
*/
void consume_skb(struct sk_buff *skb)
{
if (unlikely(!skb))
return;
//如果该skb的引用计数为1,那么需要真的释放
if (likely(atomic_read(&skb->users) == 1))
smp_rmb();
//如果skb引用计数大于1,那么仅仅是将其引用计数减1
else if (likely(!atomic_dec_and_test(&skb->users)))
return;
__kfree_skb(skb);
}
到此,回忆一下前面设置了MSG_PEEK的处理,对于此种情况,在调用__skb_recv_datagram()时并不会真的将skb从接收队列中移除,只是返回其指针,并且增加了对该skb的引用计数,所以在接收完毕后调用skb_free_datagram_locked()的时候,该skb的引用计数至少为2,并不会真正的释放。
2 后备队列 sk_backlog 中的skb处理 release_sock()
在《linux内核协议栈 UDP之数据报接收过程Ⅰ》中有提到,在软中断接收过程中,如果当前传输控制块刚好被进程上下文锁定,那么只是将数据放入到后备队列中,我们并没有介绍该队列中的数据又是如何被应用接收的。实际上,在进程上下文中调用release_sock()的时候会处理该后备队列,代码如下:
void release_sock(struct sock *sk)
{
/*
* The sk_lock has mutex_unlock() semantics:
*/
mutex_release(&sk->sk_lock.dep_map, 1, _RET_IP_);
spin_lock_bh(&sk->sk_lock.slock);
//重点看这里,如果后备队列不为空,调用__release_sock()进行处理
if (sk->sk_backlog.tail)
__release_sock(sk);
sk->sk_lock.owned = 0;
if (waitqueue_active(&sk->sk_lock.wq))
wake_up(&sk->sk_lock.wq);
spin_unlock_bh(&sk->sk_lock.slock);
}
关于传输控制块的同步锁可以参考笔记《linux内核协议栈 套接口层之传输控制块同步锁socket_lock_t》
static void __release_sock(struct sock *sk)
{
//获取后备队列第一个元素
struct sk_buff *skb = sk->sk_backlog.head;
do {
//这里先将后备队列清空,然后打开硬中断,但是软中没有打开。
//由于对数据包的处理比较耗时,这种处理方式可以提高系统性能
sk->sk_backlog.head = sk->sk_backlog.tail = NULL;
bh_unlock_sock(sk);
//循环处理后备队列中数据包
do {
struct sk_buff *next = skb->next;
skb->next = NULL;
//处理该数据包
sk_backlog_rcv(sk, skb);
/*
* We are in process context here with softirqs
* disabled, use cond_resched_softirq() to preempt.
* This is safe to do because we've taken the backlog
* queue private:
*/
//重新调度一下下半部
cond_resched_softirq();
skb = next;
} while (skb != NULL);
//再次持锁,因为要判断传输控制块的后备队列是否为空。因为前面重新调度过软中断,
//所以下面的外层循环可以保证能够处理新到来的数据包
bh_lock_sock(sk);
} while ((skb = sk->sk_backlog.head) != NULL);
}
2.1 后备队列skb进入接收队列 sk_backlog_rcv()
static inline int sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
{
//对于UDP,该回调函数就是__udp_queue_rcv_skb(),
//对于TCP,该回调函数则是 tcp_v4_do_rcv()
//在软中断中就是使用该函数将数据包放入了接收队列
return sk->sk_backlog_rcv(sk, skb);
}
最后
以上就是野性寒风为你收集整理的linux内核协议栈 UDP之数据报接收过程Ⅱ1 系统调用 udp_recvmsg()2 后备队列 sk_backlog 中的skb处理 release_sock()的全部内容,希望文章能够帮你解决linux内核协议栈 UDP之数据报接收过程Ⅱ1 系统调用 udp_recvmsg()2 后备队列 sk_backlog 中的skb处理 release_sock()所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复