Channel
提出问题
经过自己很长一段时间的实验,发现带着问题来阅读源码是非常高效的,因为源码一般大而杂,分支也比较多,稍不留神就容易走进细枝末节跳不出来了。所以我们这里先抛出几个问题,然后带着问题一起去源码中寻找答案。
channel的数据结构是怎么,内部又包含了哪些组件
goroutine向channel中发送数据的阻塞机制是什么
有缓冲和无缓冲在实现过程中有哪些区别
channel-safe的实现机制是什么
write-to-channel 与 read-from-channel 是怎么协作的
channel在关闭之后还可以做到能读不能写,那关闭时需要注意什么
接下来我们将一些比较重要的源码,以及源码分析过程中的备注都贴到下方。
Channel定义
源码位置 /src/runtime/chan.go
channel 的实现,很明显是使用队列来完成。
大家都知道channel是线程安全的,那么它是怎么实现的呢?答案很简单就是锁机制。
sudog 源码位置 src/runtime/runtime2.go sudog 是一种非常重要的数据结构,相当于我们前面介绍的G。
make channel
创建一个go channel
makechan 这里有几个需要注意的点。
根据channel中传递的元素类型(
makechan(type,size)),计算出一块内存大小,后期创建一块内存区域, 对应代码是
qcount记录的是buf 中的数据数量,后面会有增减。dataqsiz记录的是makechan(type,size)中的size,不会变。要记住,要不然后面会混。
chansend
向channel中发送数据。
前面我们提到过,channel线程安全的原因就是有锁机制,所以向channel中写入数据的话,首先要获取锁,然后发送数据,发送完成之后,释放锁。
通过代码我们能看出,发送数据时,首先会查看recvq中是否有是否有goroutine在等待,如果有直接讲数据发送给他。
发送数据到channel实际调用的是runtime.chansend1函数, chansend1函数调用了chansend函数, 流程是:
下面的文字引用自 协程的实现原理
检查channel.recvq是否有等待中的接收者的G
如果有, 表示channel无缓冲区或者缓冲区为空
调用send函数
如果sudog.elem不等于nil, 调用sendDirect函数从发送者直接复制元素
等待接收的sudog.elem是指向接收目标的内存的指针, 如果是接收目标是_则elem是nil, 可以省略复制
等待发送的sudog.elem是指向来源目标的内存的指针
复制后调用goready恢复发送者的G
切换到g0调用ready函数, 调用完切换回来
把G的状态由等待中(_Gwaiting)改为待运行(_Grunnable)
把G放到P的本地运行队列
如果当前有空闲的P, 但是无自旋的M(nmspinning等于0), 则唤醒或新建一个M
从发送者拿到数据并唤醒了G后, 就可以从chansend返回了
判断是否可以把元素放到缓冲区中
如果缓冲区有空余的空间, 则把元素放到缓冲区并从chansend返回
无缓冲区或缓冲区已经写满, 发送者的G需要等待
获取当前的g
新建一个sudog
设置sudog.elem = 指向发送内存的指针
设置sudog.g = g
设置sudog.c = channel
设置g.waiting = sudog
把sudog放入channel.sendq
调用goparkunlock函数
调用gopark函数
通过mcall函数调用park_m函数
mcall函数和上面说明的一样, 会把当前的状态保存到g.sched, 然后切换到g0和g0的栈空间并执行指定的函数
park_m函数首先把G的状态从运行中(_Grunning)改为等待中(_Gwaiting)
然后调用dropg函数解除M和G之间的关联
再调用传入的解锁函数, 这里的解锁函数会对解除channel.lock的锁定
最后调用schedule函数继续调度
从这里恢复表示已经成功发送或者channel已关闭
检查sudog.param是否为nil, 如果为nil表示channel已关闭, 抛出panic
否则释放sudog然后返回
chanrecv
与发送数据一样,读取数据也要首先获取锁,然后才能读取。
画重点:
下面这段代码很重要,它解释了sendx和recvx的大小,最大recvx不会超过dataqsiz
下面的文字引用自 协程的实现原理
从channel接收数据实际调用的是runtime.chanrecv1函数, chanrecv1函数调用了chanrecv函数, 流程是:
检查channel.sendq中是否有等待中的发送者的G
如果有, 表示channel无缓冲区或者缓冲区已满, 这两种情况需要分别处理(为了保证入出队顺序一致)
调用recv函数
如果无缓冲区, 调用recvDirect函数把元素直接复制给接收者
如果有缓冲区代表缓冲区已满
把队列中下一个要出队的元素直接复制给接收者
把发送的元素复制到队列中刚才出队的位置
这时候缓冲区仍然是满的, 但是发送序号和接收序号都会增加1
复制后调用goready恢复接收者的G, 处理同上
把数据交给接收者并唤醒了G后, 就可以从chanrecv返回了
判断是否可以从缓冲区获取元素
如果缓冲区有元素, 则直接取出该元素并从chanrecv返回
无缓冲区或缓冲区无元素, 接收者的G需要等待
获取当前的g
新建一个sudog
设置sudog.elem = 指向接收内存的指针
设置sudog.g = g
设置sudog.c = channel
设置g.waiting = sudog
把sudog放入channel.recvq
调用goparkunlock函数, 处理同上
从这里恢复表示已经成功接收或者channel已关闭
检查sudog.param是否为nil, 如果为nil表示channel已关闭
和发送不一样的是接收不会抛panic, 会通过返回值通知channel已关闭
释放sudog然后返回
close
下面的文字引用自 协程的实现原理
关闭channel实际调用的是closechan函数, 流程是:
设置channel.closed = 1
枚举channel.recvq, 清零它们sudog.elem, 设置sudog.param = nil
枚举channel.sendq, 设置sudog.elem = nil, 设置sudog.param = nil
调用goready函数恢复所有接收者和发送者的G
参考链接
Last updated