Golang Channel 原理

Golang Channel 原理

channel 的底层结构

一把锁 一个环形数组(可选) 两个等待队列(sendq / recvq)

type hchan struct {
    qcount   uint           // 当前队列中元素个数
    dataqsiz uint           // 环形缓冲区大小
    buf      unsafe.Pointer // 指向环形数组
    elemsize uint16
    closed   uint32
    sendx    uint           // 发送索引
    recvx    uint           // 接收索引
    recvq    waitq          // 等待接收的 goroutine 队列
    sendq    waitq          // 等待发送的 goroutine 队列
    lock     mutex
}

**sudog **的底层结构

保存 goroutine

保存发送数据地址

type sudog struct {
    g *g
    elem unsafe.Pointer
}

Send

// 位于 src/runtime/chan.go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

    // 锁住 channel,并发安全
    lock(&c.lock)

    // 如果接收队列里有 goroutine,直接将要发送的数据拷贝到接收 goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 对于缓冲型的 channel,如果还有缓冲空间
    if c.qcount < c.dataqsiz {
        // qp 指向 buf 的 sendx 位置
        qp := chanbuf(c, c.sendx)

        // 将数据从 ep 处拷贝到 qp
        typedmemmove(c.elemtype, qp, ep)
        // 发送游标值加 1
        c.sendx++
        // 如果发送游标值等于容量值,游标值归 0
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 缓冲区的元素数量加一
        c.qcount++

        // 解锁
        unlock(&c.lock)
        return true
    }

    // 如果不需要阻塞,则直接返回错误
    if !block {
        unlock(&c.lock)
        return false
    }

    // channel 满了,发送方会被阻塞。接下来会构造一个 sudog

    // 获取当前 goroutine 的指针
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil

    // 当前 goroutine 进入发送等待队列
    c.sendq.enqueue(mysg)

    // 当前 goroutine 被挂起
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

   
    // 去掉 mysg 上绑定的 channel
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

1.无缓冲 channel(dataqsiz == 0)

有 goroutine 在等待接收

  1. recvq 取出一个 goroutine
  2. 直接把发送数据 copy 到接收方栈空间
  3. 唤醒接收 goroutine
  4. 发送方直接返回

没有 goroutine 在等待接收

  1. 当前发送 goroutine 封装成 sudog
  2. 加入 sendq 队列
  3. 调用 gopark 挂起
  4. 让出 CPU,发送方进入休眠

后续

  1. 接收者发现 sendq 非空
  2. 直接取出 sudog
  3. 拿到数据
  4. 唤醒发送者

2.有缓冲 channel(dataqsiz > 0)

缓冲区没满(qcount < dataqsiz)

  1. 把数据 copy 到 buf[sendx]
  2. sendx++
  3. qcount++
  4. 如果 recvq 有等待 goroutine:

​ 唤醒一个

  1. 发送方直接返回

缓冲区满(qcount == dataqsiz)

  1. 当前 goroutine 封装为 sudog
  2. 加入 sendq
  3. gopark 挂起

之后

  1. 接收者从 buf 取出一个元素
  2. 如果 sendq 非空:

​ 取出一个等待发送者 ​ 把它的数据放入 buf ​ 唤醒发送者

Recv

1.无缓冲 channel(dataqsiz == 0)

有 goroutine 在等待发送

  1. sendq 取出一个 sudog
  2. 直接把发送者的数据拷贝到接收者变量
  3. 唤醒发送者
  4. 接收返回

没有 goroutine 在等待发送

  1. 当前 goroutine 封装为 sudog
  2. 加入 recvq
  3. 调用 gopark()
  4. 阻塞等待

后来有发送者

  1. 发送者发现 recvq 非空
  2. 直接拷贝数据
  3. 唤醒接收者

2.有缓冲 channel(dataqsiz > 0)

缓冲区有数据(qcount > 0)

  1. 从 buf[recvx] 取数据,recvx++,qcount–

如果 sendq 非空:

  1. 取出一个等待发送者
  2. 把它的数据放入 buf [sendx]
  3. sendx++
  4. qcount++
  5. 唤醒发送者

缓冲区为空(qcount == 0)

  1. 如果 sendq 非空:

​ 说明有阻塞发送者 ​ 直接拿数据(不走缓冲) ​ 唤醒发送者 ​ 返回

  1. 如果 channel 已关闭:

​ 返回零值(ok=false)

  1. 否则:

​ 当前 goroutine 加入 recvq ​ gopark 阻塞

close

func closechan(c *hchan) {
    // 上锁
    lock(&c.lock)
    // 如果 channel 已经关闭
    if c.closed != 0 {
        unlock(&c.lock)
        // panic
        panic(plainError("close of closed channel"))
    }


    // 修改关闭状态
    c.closed = 1

    var glist *g

    // 将 channel 所有等待接收队列的里 sudog 释放
    for {
        // 从接收队列里出队一个 sudog
        sg := c.recvq.dequeue()
        // 出队完毕,跳出循环
        if sg == nil {
            break
        }
        // 给它赋一个相应类型的零值
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }

        // 取出 goroutine
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 相连,形成链表
        gp.schedlink.set(glist)
        glist = gp
    }

    // 将 channel 等待发送队列里的 sudog 释放
    // 如果存在,这些 goroutine 将会 panic
    for {
        // 从发送队列里出队一个 sudog
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }

        // 发送者会 panic
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 形成链表
        gp.schedlink.set(glist)
        glist = gp
    }
    // 解锁
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    // 遍历链表
    for glist != nil {
        // 取最后一个
        gp := glist
        // 向前走一步,下一个唤醒的 g
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        // 唤醒相应 goroutine
        goready(gp, 3)
    }
}

广播所有的接收者和发送者

对于接收者:会收到一个相应类型的零值

等待发送者: 会执行代码 if c.closed != 0panic(“send on closed channel”),会报错