jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

深入Golang之channel

作者:jasper | 分类:Golang | 标签:   | 阅读 147 次 | 发布:2017-10-15 6:53 p.m.

在Golang中channel被广泛地运用,它可以被存储到变量中,可以作为参数传递给函数,也可以作为函数的返回值返回。在Golang中常常使用channel+goroutine来实现并发的控制并发,下面来了解下channel的内部实现。

数据结构

type hchan struct {
    qcount   uint           // 队列中总数据量
    dataqsiz uint           // 环形队列q的数据大小
    buf      unsafe.Pointer // 指向dataqsiz数组的指针
    elemsize uint16
    closed   uint32
    elemtype *_type // 元素类型
    sendx    uint   // 发送index
    recvx    uint   // 接收index
    recvq    waitq  // 因recv而阻塞的等待队列
    sendq    waitq  // 因send而阻塞的等待队列

    lock mutex
}

上面这个结构体的核心就是存放channel数据的环形队列,但是在这个结构体中只有队列的大小,没看到存放数据的地方啊,这就需要来看看这个waitq了:

type waitq struct {
    first *sudog
    last  *sudog
} 

type sudog struct {
    g          *g
    selectdone *uint32 
    next       *sudog
    prev       *sudog
    elem       unsafe.Pointer 

    acquiretime int64
    releasetime int64
    ticket      uint32
    parent      *sudog 
    waitlink    *sudog
    waittail    *sudog 
    c           *hchan
}

recvq和sendq是两个链表,一个是读操作阻塞在channel的goroutine 列表,另一个是写操作阻塞在channel的goroutine列表。waitq是链表的定义,包括一个头一个尾节点,实现则是sudog,该结构中主要的就是一个g和一个elem。elem用于存储goroutine的数据。读通道时,数据会从hchan的队列中拷贝到sudog的elem域。写通道时,数据则是由sudog的elem域拷贝到hchan的队列中。

创建channel

func makechan(t *chantype, size int64) *hchan {
    elem := t.elem

    var c *hchan
    if elem.kind&kindNoPointers != 0 || size == 0 {
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        if size > 0 && elem.size != 0 {
            c.buf = add(unsafe.Pointer(c), hchanSize)
        } else {
            c.buf = unsafe.Pointer(c)
        }
    } else {
        c = new(hchan)
        c.buf = newarray(elem, int(size))
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    return c
}

如果size等于0,其实就是无缓冲channel,或者队列里面的元素类型没有指针时,初始化就是:

c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))

调用mallocgc将其分配在连续的内存区域。如果size=0,实际上buf是不分配空间的。

如果size>0,通过newarray来分配,当然底层也是用的mallocgc。

总结下来,就是make chan的过程是在堆上进行分配,返回是一个hchan 的指针。

写channel操作

写channel的过程在函数chansend中:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

}

由于代码较长,这里就不贴了,大体总结一下过程:

当channel为nil的情况时,gopark会将当前goroutine休眠,然后通过unlockf来唤醒,这时传入的unlockf是nil,也就是向nil channel发送数据的goroutine会一直休眠,Golang中会一直检测运行情况,遇到这样的情况就会报出throw("all goroutines are asleep - deadlock!")的异常。

再判断当前的channel是否关闭了,如果是已经关闭了的,则直接panic:panic(plainError("send on closed channel"))

正式写的时候分为带缓冲和不带缓冲的channel,二者主要区别在于什么时候阻塞读写,总的来说:

当goroutine阻塞在channel上。如果hchan.buf为空,从当前channel的等待队列中取出等待的goroutine,然后调用send方法;如果hchan.buf还有可用空间,将数据放到buffer里面;如果hchan.buf已满,则阻塞当前channel。

读channel操作

读channel的过程函数在chanrecv中:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

}

具体过程和上面的写大体相似,除了阻塞的判定,以及enqueue换成了dequeue。

关闭channel操作

关闭channel最主要的操作是要清空sendq和recvq:

清空sendq:

for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        gp.schedlink.set(glist)
        glist = gp
    }

清空recvq:

for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        gp.schedlink.set(glist)
        glist = gp
    }

其实就是分别遍历recvq和sendq队列,将所有的数据放到glist队列中,最后使用goready唤醒glist队列中的goroutine。

总结

在这里对channel的原理做了大体的介绍,针对channel和select的使用原理,我们在下一篇select中再来细谈。


转载请注明出处:http://www.opscoder.info/golang_channel.html

【上一篇】 深入Golang之interface
【下一篇】 深入Golang之select
其他分类: