Golang Channel 原理
Golang Channel 原理
channel 的底层结构
一把锁 一个环形数组(可选) 两个等待队列(sendq / recvq)
type hchan struct {
qcount uint // 当前队列中元素个数
dataqsiz uint // 环形缓冲区大小
buf unsafe.Pointer // 指向环形数组
elemsize uint16
closed uint32
sendx uint // 发送索引
recvx uint // 接收索引
recvq waitq // 等待接收的 goroutine 队列
sendq waitq // 等待发送的 goroutine 队列
lock mutex
}
**sudog **的底层结构
保存 goroutine
保存发送数据地址
type sudog struct {
g *g
elem unsafe.Pointer
}
Send
// 位于 src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 锁住 channel,并发安全
lock(&c.lock)
// 如果接收队列里有 goroutine,直接将要发送的数据拷贝到接收 goroutine
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 对于缓冲型的 channel,如果还有缓冲空间
if c.qcount < c.dataqsiz {
// qp 指向 buf 的 sendx 位置
qp := chanbuf(c, c.sendx)
// 将数据从 ep 处拷贝到 qp
typedmemmove(c.elemtype, qp, ep)
// 发送游标值加 1
c.sendx++
// 如果发送游标值等于容量值,游标值归 0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 缓冲区的元素数量加一
c.qcount++
// 解锁
unlock(&c.lock)
return true
}
// 如果不需要阻塞,则直接返回错误
if !block {
unlock(&c.lock)
return false
}
// channel 满了,发送方会被阻塞。接下来会构造一个 sudog
// 获取当前 goroutine 的指针
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 当前 goroutine 进入发送等待队列
c.sendq.enqueue(mysg)
// 当前 goroutine 被挂起
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
// 去掉 mysg 上绑定的 channel
mysg.c = nil
releaseSudog(mysg)
return true
}
1.无缓冲 channel(dataqsiz == 0)
有 goroutine 在等待接收
- 从 recvq 取出一个 goroutine
- 直接把发送数据 copy 到接收方栈空间
- 唤醒接收 goroutine
- 发送方直接返回
没有 goroutine 在等待接收
- 当前发送 goroutine 封装成 sudog
- 加入 sendq 队列
- 调用 gopark 挂起
- 让出 CPU,发送方进入休眠
后续
- 接收者发现 sendq 非空
- 直接取出 sudog
- 拿到数据
- 唤醒发送者
2.有缓冲 channel(dataqsiz > 0)
缓冲区没满(qcount < dataqsiz)
- 把数据 copy 到 buf[sendx]
- sendx++
- qcount++
- 如果 recvq 有等待 goroutine:
唤醒一个
- 发送方直接返回
缓冲区满(qcount == dataqsiz)
- 当前 goroutine 封装为 sudog
- 加入 sendq
- gopark 挂起
之后
- 接收者从 buf 取出一个元素
- 如果 sendq 非空:
取出一个等待发送者 把它的数据放入 buf 唤醒发送者
Recv
1.无缓冲 channel(dataqsiz == 0)
有 goroutine 在等待发送
- 从 sendq 取出一个 sudog
- 直接把发送者的数据拷贝到接收者变量
- 唤醒发送者
- 接收返回
没有 goroutine 在等待发送
- 当前 goroutine 封装为 sudog
- 加入 recvq
- 调用 gopark()
- 阻塞等待
后来有发送者
- 发送者发现 recvq 非空
- 直接拷贝数据
- 唤醒接收者
2.有缓冲 channel(dataqsiz > 0)
缓冲区有数据(qcount > 0)
- 从 buf[recvx] 取数据,recvx++,qcount–
如果 sendq 非空:
- 取出一个等待发送者
- 把它的数据放入 buf [sendx]
- sendx++
- qcount++
- 唤醒发送者
缓冲区为空(qcount == 0)
- 如果 sendq 非空:
说明有阻塞发送者 直接拿数据(不走缓冲) 唤醒发送者 返回
- 如果 channel 已关闭:
返回零值(ok=false)
- 否则:
当前 goroutine 加入 recvq gopark 阻塞
close
func closechan(c *hchan) {
// 上锁
lock(&c.lock)
// 如果 channel 已经关闭
if c.closed != 0 {
unlock(&c.lock)
// panic
panic(plainError("close of closed channel"))
}
// 修改关闭状态
c.closed = 1
var glist *g
// 将 channel 所有等待接收队列的里 sudog 释放
for {
// 从接收队列里出队一个 sudog
sg := c.recvq.dequeue()
// 出队完毕,跳出循环
if sg == nil {
break
}
// 给它赋一个相应类型的零值
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
// 取出 goroutine
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
// 相连,形成链表
gp.schedlink.set(glist)
glist = gp
}
// 将 channel 等待发送队列里的 sudog 释放
// 如果存在,这些 goroutine 将会 panic
for {
// 从发送队列里出队一个 sudog
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 发送者会 panic
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
// 形成链表
gp.schedlink.set(glist)
glist = gp
}
// 解锁
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
// 遍历链表
for glist != nil {
// 取最后一个
gp := glist
// 向前走一步,下一个唤醒的 g
glist = glist.schedlink.ptr()
gp.schedlink = 0
// 唤醒相应 goroutine
goready(gp, 3)
}
}
广播所有的接收者和发送者
对于接收者:会收到一个相应类型的零值
等待发送者: 会执行代码 if c.closed != 0panic(“send on closed channel”),会报错