Channel

提出问题

经过自己很长一段时间的实验,发现带着问题来阅读源码是非常高效的,因为源码一般大而杂,分支也比较多,稍不留神就容易走进细枝末节跳不出来了。所以我们这里先抛出几个问题,然后带着问题一起去源码中寻找答案。

  • channel的数据结构是怎么,内部又包含了哪些组件

  • goroutine向channel中发送数据的阻塞机制是什么

  • 有缓冲和无缓冲在实现过程中有哪些区别

  • channel-safe的实现机制是什么

  • write-to-channel 与 read-from-channel 是怎么协作的

  • channel在关闭之后还可以做到能读不能写,那关闭时需要注意什么

接下来我们将一些比较重要的源码,以及源码分析过程中的备注都贴到下方。

Channel定义

源码位置 /src/runtime/chan.goarrow-up-right

channel 的实现,很明显是使用队列来完成。

大家都知道channel是线程安全的,那么它是怎么实现的呢?答案很简单就是锁机制。

sudog 源码位置 src/runtime/runtime2.goarrow-up-right sudog 是一种非常重要的数据结构,相当于我们前面介绍的G。

make channel

创建一个go channel

makechan 这里有几个需要注意的点。

  • 根据channel中传递的元素类型(makechan(type,size)),计算出一块内存大小,后期创建一块内存区域, 对应代码是

  • qcount 记录的是buf 中的数据数量,后面会有增减。dataqsiz 记录的是 makechan(type,size) 中的size,不会变。要记住,要不然后面会混。

chansend

向channel中发送数据。

前面我们提到过,channel线程安全的原因就是有锁机制,所以向channel中写入数据的话,首先要获取锁,然后发送数据,发送完成之后,释放锁。

通过代码我们能看出,发送数据时,首先会查看recvq中是否有是否有goroutine在等待,如果有直接讲数据发送给他。

发送数据到channel实际调用的是runtime.chansend1函数, chansend1函数调用了chansend函数, 流程是:

下面的文字引用自 协程的实现原理arrow-up-right

  • 检查channel.recvq是否有等待中的接收者的G

    • 如果有, 表示channel无缓冲区或者缓冲区为空

    • 调用send函数

      • 如果sudog.elem不等于nil, 调用sendDirect函数从发送者直接复制元素

      • 等待接收的sudog.elem是指向接收目标的内存的指针, 如果是接收目标是_则elem是nil, 可以省略复制

      • 等待发送的sudog.elem是指向来源目标的内存的指针

      • 复制后调用goready恢复发送者的G

        • 切换到g0调用ready函数, 调用完切换回来

          • 把G的状态由等待中(_Gwaiting)改为待运行(_Grunnable)

          • 把G放到P的本地运行队列

          • 如果当前有空闲的P, 但是无自旋的M(nmspinning等于0), 则唤醒或新建一个M

    • 从发送者拿到数据并唤醒了G后, 就可以从chansend返回了

  • 判断是否可以把元素放到缓冲区中

    • 如果缓冲区有空余的空间, 则把元素放到缓冲区并从chansend返回

  • 无缓冲区或缓冲区已经写满, 发送者的G需要等待

    • 获取当前的g

    • 新建一个sudog

    • 设置sudog.elem = 指向发送内存的指针

    • 设置sudog.g = g

    • 设置sudog.c = channel

    • 设置g.waiting = sudog

    • 把sudog放入channel.sendq

    • 调用goparkunlock函数

      • 调用gopark函数

        • 通过mcall函数调用park_m函数

          • mcall函数和上面说明的一样, 会把当前的状态保存到g.sched, 然后切换到g0和g0的栈空间并执行指定的函数

          • park_m函数首先把G的状态从运行中(_Grunning)改为等待中(_Gwaiting)

          • 然后调用dropg函数解除M和G之间的关联

          • 再调用传入的解锁函数, 这里的解锁函数会对解除channel.lock的锁定

          • 最后调用schedule函数继续调度

  • 从这里恢复表示已经成功发送或者channel已关闭

    • 检查sudog.param是否为nil, 如果为nil表示channel已关闭, 抛出panic

    • 否则释放sudog然后返回

chanrecv

与发送数据一样,读取数据也要首先获取锁,然后才能读取。

画重点:

下面这段代码很重要,它解释了sendx和recvx的大小,最大recvx不会超过dataqsiz

下面的文字引用自 协程的实现原理arrow-up-right

从channel接收数据实际调用的是runtime.chanrecv1函数, chanrecv1函数调用了chanrecv函数, 流程是:

  • 检查channel.sendq中是否有等待中的发送者的G

    • 如果有, 表示channel无缓冲区或者缓冲区已满, 这两种情况需要分别处理(为了保证入出队顺序一致)

    • 调用recv函数

      • 如果无缓冲区, 调用recvDirect函数把元素直接复制给接收者

      • 如果有缓冲区代表缓冲区已满

        • 把队列中下一个要出队的元素直接复制给接收者

        • 把发送的元素复制到队列中刚才出队的位置

        • 这时候缓冲区仍然是满的, 但是发送序号和接收序号都会增加1

      • 复制后调用goready恢复接收者的G, 处理同上

    • 把数据交给接收者并唤醒了G后, 就可以从chanrecv返回了

  • 判断是否可以从缓冲区获取元素

    • 如果缓冲区有元素, 则直接取出该元素并从chanrecv返回

  • 无缓冲区或缓冲区无元素, 接收者的G需要等待

    • 获取当前的g

    • 新建一个sudog

    • 设置sudog.elem = 指向接收内存的指针

    • 设置sudog.g = g

    • 设置sudog.c = channel

    • 设置g.waiting = sudog

    • 把sudog放入channel.recvq

    • 调用goparkunlock函数, 处理同上

  • 从这里恢复表示已经成功接收或者channel已关闭

    • 检查sudog.param是否为nil, 如果为nil表示channel已关闭

    • 和发送不一样的是接收不会抛panic, 会通过返回值通知channel已关闭

    • 释放sudog然后返回

close

下面的文字引用自 协程的实现原理arrow-up-right

关闭channel实际调用的是closechan函数, 流程是:

  • 设置channel.closed = 1

  • 枚举channel.recvq, 清零它们sudog.elem, 设置sudog.param = nil

  • 枚举channel.sendq, 设置sudog.elem = nil, 设置sudog.param = nil

  • 调用goready函数恢复所有接收者和发送者的G

参考链接

Last updated