概述
文章目录
- channel实现方式
- 说明
- 思考包含的主要场景
- 源码实现
- hchan 介绍
- 创建 hchan
- 往chan里面发送数据
- 函数 chansend
- 函数 send
- 从chan里面获取数据
- 函数 chanrecv
- 函数 recv
- 关闭 chan
- 总结
channel实现方式
说明
- chan怎么使用不是本文的主题
- 本文的chan的是基于golang 1.13,系统是mac os
- 本文的思路:自己思考chan有哪些点,他是哪些特点,带着这些特点,去代码中寻找答案
思考包含的主要场景
从过程来看,chan主要涉及到三个大点,
第一怎么创建一个chan
第二怎么发送和接受数据
第三怎么关闭chan
具体上,应该包含这些
- 创建channel
- 有缓存
- 无缓存
- 往channel里面发送数据
- 有缓存
- 无缓存
- 有接收者
- 无接受者
- 从channel里面读取数据
- 有缓存
- 无缓存
- 有数据
- 无数据
- 关闭chan
源码实现
当前代码从/runtime/hchan.go
hchan 介绍
hchan结构体,主要是缓冲区的存贮buf,是一个数组,以及两个队列(双向链表实现)sendq和recvq,主要是利用队列的先进先出的特性,完成chan的发送和接受的顺序,通过lock保证并发正确性
type hchan struct {
qcount
uint
// total data in the queue,当前data数
dataqsiz uint
// size of the circular queue //缓存的大小
buf
unsafe.Pointer // points to an array of dataqsiz elements 存放缓存的一个数组,通过index实现的环形数组
elemsize uint16
//chan里面的元素的大小,如chan int,则表示int的大小
elemtype *_type // element type 元素的类型信息
closed
uint32
//是否已经关闭,1表示已经关闭,0表示开着
sendx
uint
// send index 发送者的index, 有缓冲区使用
recvx
uint
// receive index 接收者的index,有缓冲区使用
recvq
waitq
// list of recv waiters
接受者的队列(使用双向链表实现),当缓冲区无数据,会放到里面
sendq
waitq
// list of send waiters 发送者的队列(使用双向链表实现),缓冲区已经满了或者无缓冲区的时候,放到里面
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex //锁,保护hchan的所有元素已经sudogs的元素
}
创建 hchan
代码入口:
函数:makechan64或者makechan,我们从makechan入手
func makechan(t *chantype, size int) *hchan {
...
}
size即缓存的大小
chantype是通过编译过来的chan的类型,包含chan类型信息
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// elem.size*size算出缓冲区的大小,以及是否溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0: //无缓冲区
// Queue or element size is zero.
//直接分配hchan size的内存,c指向它
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers. 元素不包含指针
// Allocate hchan and buf in one call.
直接一次分配hchan和buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers. 包含指针,使用new先把hchan分配好,在分配buf的内存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
//元素大小
c.elemtype = elem //元素信息
c.dataqsiz = uint(size) //缓冲区的大小
if debugChan { // 可以自己设置debugChan为true,自己想打印源代码,可以使用print函数
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "n")
}
return c
}
往chan里面发送数据
函数 chansend
- 无缓冲区
- 如果有recvq里面有receiver,直接把值给它
- 如果没有recvq,将数据封装成sudog,放到sendq里,将当前g挂起来
- 有缓冲区
- 如果有recvq里面有receiver,直接把值给它
- 如果没有recvq
- 如果缓冲区没有满,直接放到缓冲区buf里面
- 如果缓冲区满了,将数据封装成sudog,放到 sendq里面,将当前g挂起来
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
}
// c hchan的指针,
// block是否阻塞,select
有个default是不阻塞的即selectnbsend函数的实现
//ep 发送的数据的指针
// callerpc 调用者的程序计数器
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
//针对不阻塞的场景,未关闭,无缓冲区,则直接返回false,走select的default场景
//或者有缓冲区,但是缓冲区已经满了场景直接返回false
if !block && c.closed == 0 && (c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 开始上锁,保证并发安全性
lock(&c.lock)
//如果已经关闭,再发送数据,直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 在recvq的双向链表里面获取第一个sudog,如果获取到,表明当前已经有接受者,则进行发送数据,send将讲述怎么发送,在send函数的解释里面说
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 有缓冲区的场景
if c.qcount < c.dataqsiz {
//下面的操作是ep位置的数据copy到buf sendx位置,即实现数据的缓冲,然后unlock,返回true
// Space is available in the channel buffer. Enqueue the element to send. buf对应sendx的指针
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
//把ep地址指向的数据move到qp地址上面
typedmemmove(c.elemtype, qp, ep)
c.sendx++
//循环列表
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// 如果是不阻塞的,直接返回,因为下面的逻辑都是缓冲区是空的,所以直接返回
if !block {
unlock(&c.lock)
return false
}
//
缓冲区为空,则将ep放到sudog里面,然后将sudog放到sendq队列里面去,然后将当前g挂起
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
//这之前当前的g是挂住的,后续代码不会进行,一直到接受者,接受这个数据,才会将会将g恢复到运行状态
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
函数 send
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//表示是否启用数据竞争检测
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
//如果要返回值的话,将ep的数据move到receiver的elem去
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 释放receiver对应的g,让对应的g继续运行
goready(gp, skip+1)
}
从chan里面获取数据
函数 chanrecv
- 无缓冲区
- 如果sendq里面有数据,直接sendq获取第一个sudog,获取值
- 如果没有sendq,将数据封装成sudog,放到recvq里面
- 有缓冲区
- 如果sendq里面有数据,取出一个sudog,从缓冲区位置recvx取出值, 放置到返回值里面,然后将sudog里面的值放到刚刚从缓冲区取值的位置recvx里面
- 如果缓冲区有数据,从缓冲区取值
- 如果缓冲区无数据,封装成sudog,放到recvq里面
// c 当前hchan
// ep返回的值的地址,如果为nil,则忽略返回值
// block是否阻塞,跟send的block一个含义
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "n")
}
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
//如果不阻塞,无缓冲去为0 或者当前缓冲区已经满了 且未关闭,直接返回,因为这些都是阻塞式的
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 开始上锁
lock(&c.lock)
// 当前chan已经关闭,并且已经没有数据了,直接返回
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 从发送队列里面获取一个sudog,如果获取不为空,则进行接收数据,具体recv的说明在后面,这个分为两个场景,一个为缓冲区已经满了,放到了sendq队列,一个是缓冲区为0,直接放到sendq,需要先把sendq里面的数据取出来,将buf里第recv个数据赋值给ep(如果有返回值的话),然后将取出来的sudog里的elem放到buf里的第recv,保证是队列的先进先出
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 缓冲区里面有数据,分为两个场景,一个为chan已经关闭,一个chan正常
if c.qcount > 0 {
// Receive directly from queue
// 从buf里面获取当前recvx的数据
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 有返回值,则将qp指针指向的数据拷贝到ep指针位置
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
//然后将buf revcx位置的数据清空
typedmemclr(c.elemtype, qp)
// recvx加1,如果buf数组都读取完了,直接将recvx设置为0,实现一个循环数组
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 当前数据个数减1,unlock,返回
c.qcount--
unlock(&c.lock)
return true, true
}
// 无缓冲区,不阻塞的读数据,直接返回
if !block {
unlock(&c.lock)
return false, false
}
//如果缓冲区为0或者缓冲区里面没有数据,则将当前ep封装为sudog,然后放到队列recvq里面,然后将当前g挂起
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
函数 recv
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
// 无缓冲区,也要返回elem,则把sg里面存的值赋值给ep,返回
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
// 要返回值,则将buf里面的第recvx个里面对应值,赋值给ep
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
// 然后把sg里面的elem值赋值到buf里面的第recvx个里面(即qp),环形的buf,保证先把buf里面的值读取完,然后读取sendq里面的sudog里面的值
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
//环形数组
c.recvx = 0
}
// 下个要往缓冲区写入的index
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 将sender的g不在阻塞
goready(gp, skip+1)
}
关闭 chan
close chan,主要是两个释放和将closed设置为1,sendq里面都是为发送成功的数据,直接丢弃,缓冲里面的数据,还可以继续读,如果recvq有等待者,表面chan里面是无数据的,直接释放。
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
// chan已经关闭,再关闭,panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
//将closed设置为1
c.closed = 1
var glist gList
// release all readers,释放所有的readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
//要返回值,将值设置为nil
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
放置到glist
glist.push(gp)
}
// release all writers (they will panic),释放所有挂住的的sender
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
总结
主旨:使用队列的先进先出能力,进行channel发送和接受任务管理
有缓冲则使用 环形数组+双向链表实现带缓存功能的channel
不带缓存功能的:双向链表
最后
以上就是激动信封为你收集整理的channel源代码实现channel实现方式说明思考包含的主要场景源码实现总结的全部内容,希望文章能够帮你解决channel源代码实现channel实现方式说明思考包含的主要场景源码实现总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复