channel是什么意思?深度解密Go语言之channel

知新问答网作者江湖小虾米2020-05-04 精选提问 阅读

并发与并行

大家都知道著名的摩尔定律。1965 年,时任仙童公司的 Gordon Moore 发表文章,预测在未来十年,半导体芯片上的晶体管和电阻数量将每年增加一倍;1975 年,Moore 再次发表论文,将“每年”修改为“每两年”。这个预测在 2012 年左右基本是正确的。

但随着晶体管电路逐渐接近性能极限,摩尔定律终将走到尽头。靠增加晶体管数量来提高计算机的性能不灵了。于是,人们开始转换思路,用其他方法来提升计算机的性能,这就是多核计算机产生的原因。

这一招看起来还不错,但是人们又遇到了一个另一个定律的限制,那就是 Amdahl's Law,它提出了一个模型用来衡量在并行模式下程序运行效率的提升。这个定律是说,一个程序能从并行上获得性能提升的上限取决于有多少代码必须写成串行的。

举个例子,对于一个和用户打交道的界面程序,它必须和用户打交道。用户点一个按钮,然后才能继续运行下一步,这必须是串行执行的。这种程序的运行效率就取决于和用户交互的速度,你有多少核都白瞎。用户就是不按下一步,你怎么办?

2000 年左右云计算兴起,人们可以方便地获取计算云上的资源,方便地水平扩展自己的服务,可以轻而易举地就调动多台机器资源甚至将计算任务分发到分布在全球范围的机器。但是也因此带来了很多问题和挑战。例如怎样在机器间进行通信、聚合结果等。最难的一个挑战是如何找到一个模型能用来描述 concurrent。

我们都知道,要想一段并发的代码没有任何 bug,是非常困难的。有些并发 bug 是在系统上线数年后才发现的,原因常常是很诡异的,比如用户数增加到了某个界限。

并发问题一般有下面这几种:

数据竞争。简单来说就是两个或多个线程同时读写某个变量,造成了预料之外的结果。

原子性。在一个定义好的上下文里,原子性操作不可分割。上下文的定义非常重要。有些代码,你在程序里看起来是原子的,如最简单的 i++,但在机器层面看来,这条语句通常需要几条指令来完成(Load,Incr,Store),不是不可分割的,也就不是原子性的。原子性可以让我们放心地构造并发安全的程序。

内存访问同步。代码中需要控制同时只有一个线程访问的区域称为临界区。Go 语言中一般使用 sync 包里的 Mutex 来完成同步访问控制。锁一般会带来比较大的性能开销,因此一般要考虑加锁的区域是否会频繁进入、锁的粒度如何控制等问题。

死锁。在一个死锁的程序里,每个线程都在等待其他线程,形成了一个首尾相连的尴尬局面,程序无法继续运行下去。

活锁。想象一下,你走在一条小路上,一个人迎面走来。你往左边走,想避开他;他做了相反的事情,他往右边走,结果两个都过不了。之后,两个人又都想从原来自己相反的方向走,还是同样的结果。这就是活锁,看起来都像在工作,但工作进度就是无法前进。

饥饿。并发的线程不能获取它所需要的资源以进行下一步的工作。通常是有一个非常贪婪的线程,长时间占据资源不释放,导致其他线程无法获得资源。

关于并发和并行的区别,引用一个经典的描述:

并发是同一时间应对(dealing with)多件事情的能力。 并行是同一时间动手(doing)做多件事情的能力。

雨痕老师《Go 语言学习笔记》上的解释:

并发是指逻辑上具备同时处理多个任务的能力;并行则是物理上同时执行多个任务。

而根据《Concurrency in Go》这本书,计算机的概念都是抽象的结果,并发和并行也不例外。它这样描述并发和并行的区别:

Concurrency is a property of the code; parallelism is a property of the running program.

并发是代码的特性,并行是正在运行的程序的特性。先忽略我拙劣的翻译。很新奇,不是吗?我也是第一次见到这样的说法,细想一下,还是很有道理的。

我们一直说写的代码是并发的或者是并行的,但是我们能提供什么保证吗?如果在只有一个核的机器上跑并行的代码,它还能并行吗?你就是再天才,也无法写出并行的程序。充其量也就是代码上看起来“并发”的,如此而已。

当然,表面上看起来还是并行的,但那不过 CPU 的障眼法,多个线程在分时共享 CPU 的资源,在一个粗糙的时间隔里看起来就是“并行”。

所以,我们实际上只能编写“并发”的代码,而不能编写“并行”的代码,而且只是希望并发的代码能够并行地执行。并发的代码能否并行,取决于抽象的层级:代码里的并发原语、runtime,操作系统(虚拟机、容器)。层级越来越底层,要求也越来越高。因此,我们谈并发或并行实际上要指定上下文,也就是抽象的层级。

《Concurrency in Go》书里举了一个例子:假如两个人同时打开电脑上的计算器程序,这两个程序肯定不会影响彼此,这就是并行。在这个例子中,上下文就是两个人的机器,而两个计算器进程就是并行的元素。

随着抽象层次的降低,并发模型实际上变得更难也更重要,而越低层次的并发模型对我们也越重要。要想并发程序正确地执行,就要深入研究并发模型。

在 Go 语言发布前,我们写并发代码时,考虑到的最底层抽象是:系统线程。Go 发布之后,在这条抽象链上,又加一个 goroutine。而且 Go 从著名的计算机科学家 Tony Hoare 那借来一个概念:channel。Tony Hoare 就是那篇著名文章《Communicating Sequential Processes》的作者。

看起来事情变得更加复杂,因为 Go 又引入了一个更底层的抽象,但事实并不是这样。因为 goroutine 并不是看起来的那样又抽象了一层,它其实是替代了系统线程。Gopher 在写代码的时候,并不会去关心系统线程,大部分时候只需要考虑到 goroutine 和 channel。当然有时候会用到一些共享内存的概念,一般就是指 sync 包里的东西,比如 sync.Mutex。

什么是 CSP

CSP 经常被认为是 Go 在并发编程上成功的关键因素。CSP 全称是 “Communicating Sequential Processes”,这也是 Tony Hoare 在 1978 年发表在 ACM 的一篇论文。论文里指出一门编程语言应该重视 input 和 output 的原语,尤其是并发编程的代码。

在那篇文章发表的时代,人们正在研究模块化编程的思想,该不该用 goto 语句在当时是最激烈的议题。彼时,面向对象编程的思想正在崛起,几乎没什么人关心并发编程。

在文章中,CSP 也是一门自定义的编程语言,作者定义了输入输出语句,用于 processes 间的通信(communicatiton)。processes 被认为是需要输入驱动,并且产生输出,供其他 processes 消费,processes 可以是进程、线程、甚至是代码块。输入命令是:!,用来向 processes 写入;输出是:?,用来从 processes 读出。这篇文章要讲的 channel 正是借鉴了这一设计。

Hoare 还提出了一个 -> 命令,如果 -> 左边的语句返回 false,那它右边的语句就不会执行。

通过这些输入输出命令,Hoare 证明了如果一门编程语言中把 processes 间的通信看得第一等重要,那么并发编程的问题就会变得简单。

Go 是第一个将 CSP 的这些思想引入,并且发扬光大的语言。仅管内存同步访问控制(原文是 memory access synchronization)在某些情况下大有用处,Go 里也有相应的 sync 包支持,但是这在大型程序很容易出错。

Go 一开始就把 CSP 的思想融入到语言的核心里,所以并发编程成为 Go 的一个独特的优势,而且很容易理解。

大多数的编程语言的并发编程模型是基于线程和内存同步访问控制,Go 的并发编程的模型则用 goroutine 和 channel 来替代。Goroutine 和线程类似,channel 和 mutex (用于内存同步访问控制)类似。

Goroutine 解放了程序员,让我们更能贴近业务去思考问题。而不用考虑各种像线程库、线程开销、线程调度等等这些繁琐的底层问题,goroutine 天生替你解决好了。

Channel 则天生就可以和其他 channel 组合。我们可以把收集各种子系统结果的 channel 输入到同一个 channel。Channel 还可以和 select, cancel, timeout 结合起来。而 mutex 就没有这些功能。

Go 的并发原则非常优秀,目标就是简单:尽量使用 channel;把 goroutine 当作免费的资源,随便用。

说明一下,前面这两部分的内容来自英文开源书《Concurrency In Go》,强烈推荐阅读。

引入结束,我们正式开始今天的主角:channel。

什么是 channel

Goroutine 和 channel 是 Go 语言并发编程的 两大基石。Goroutine 用于执行并发任务,channel 用于 goroutine 之间的同步、通信。

Channel 在 gouroutine 间架起了一条管道,在管道里传输数据,实现 gouroutine 间的通信;由于它是线程安全的,所以用起来非常方便;channel 还提供“先进先出”的特性;它还能影响 goroutine 的阻塞和唤醒。

相信大家一定见过一句话:

Do not communicate by sharing memory; instead, share memory by communicating.

不要通过共享内存来通信,而要通过通信来实现内存共享。

这就是 Go 的并发哲学,它依赖 CSP 模型,基于 channel 实现。

简直是一头雾水,这两句话难道不是同一个意思?

通过前面两节的内容,我个人这样理解这句话:前面半句说的是通过 sync 包里的一些组件进行并发编程;而后面半句则是说 Go 推荐使用 channel 进行并发编程。两者其实都是必要且有效的。实际上看完本文后面对 channel 的源码分析,你会发现,channel 的底层就是通过 mutex 来控制并发的。只是 channel 是更高一层次的并发编程原语,封装了更多的功能。

关于是选择 sync 包里的底层并发编程原语还是 channel,《Concurrency In Go》这本书的第 2 章 “Go's Philosophy on Concurrency” 里有一张决策树和详细的论述,再次推荐你去阅读。我把图贴出来:

channel 实现 CSP

Channel 是 Go 语言中一个非常重要的类型,是 Go 里的第一对象。通过 channel,Go 实现了通过通信来实现内存共享。Channel 是在多个 goroutine 之间传递数据和同步的重要手段。

使用原子函数、读写锁可以保证资源的共享访问安全,但使用 channel 更优雅。

channel 字面意义是“通道”,类似于 Linux 中的管道。声明 channel 的语法如下:

chan T // 声明一个双向通道

chan<- T // 声明一个只能用于发送的通道

<-chan T // 声明一个只能用于接收的通道

复制代码

单向通道的声明,用 <- 来表示,它指明通道的方向。你只要明白,代码的书写顺序是从左到右就马上能掌握通道的方向是怎样的。

因为 channel 是一个引用类型,所以在它被初始化之前,它的值是 nil,channel 使用 make 函数进行初始化。可以向它传递一个 int 值,代表 channel 缓冲区的大小(容量),构造出来的是一个缓冲型的 channel;不传或传 0 的,构造的就是一个非缓冲型的 channel。

两者有一些差别:非缓冲型 channel 无法缓冲元素,对它的操作一定顺序是“发送-> 接收 -> 发送 -> 接收 -> ……”,如果连续向一个非缓冲 chan 发送 2 个元素,并且没有接收的话,第二次一定会被阻塞;对于缓冲型 channel 的操作,则要“宽松”一些,毕竟是带了“缓冲”光环。

为什么要 channel

Go 通过 channel 实现 CSP 通信模型,主要用于 goroutine 之间的消息传递和事件通知。

有了 channel 和 goroutine 之后,Go 的并发编程变得异常容易和安全,得以让程序员把注意力留到业务上去,实现开发效率的提升。

要知道,技术并不是最重要的,它只是实现业务的工具。一门高效的开发语言让你把节省下来的时间,留着去做更有意义的事情,比如写写文章。

channel 实现原理

对 chan 的发送和接收操作都会在编译期间转换成为底层的发送接收函数。

Channel 分为两种:带缓冲、不带缓冲。对不带缓冲的 channel 进行的操作实际上可以看作“同步模式”,带缓冲的则称为“异步模式”。

同步模式下,发送方和接收方要同步就绪,只有在两者都 ready 的情况下,数据才能在两者间传输(后面会看到,实际上就是内存拷贝)。否则,任意一方先行进行发送或接收操作,都会被挂起,等待另一方的出现才能被唤醒。

异步模式下,在缓冲槽可用的情况下(有剩余容量),发送和接收操作都可以顺利进行。否则,操作的一方(如写入)同样会被挂起,直到出现相反操作(如接收)才会被唤醒。

小结一下:同步模式下,必须要使发送方和接收方配对,操作才会成功,否则会被阻塞;异步模式下,缓冲槽要有剩余容量,操作才会成功,否则也会被阻塞。

数据结构

直接上源码(版本是 1.9.2):

type hchan struct {

// chan 里元素数量

qcount uint

// chan 底层循环数组的长度

dataqsiz uint

// 指向底层循环数组的指针

// 只针对有缓冲的 channel

buf unsafe.Pointer

// chan 中元素大小

elemsize uint16

// chan 是否被关闭的标志

closed uint32

// chan 中元素类型

elemtype *_type // element type

// 已发送元素在循环数组中的索引

sendx uint // send index

// 已接收元素在循环数组中的索引

recvx uint // receive index

// 等待接收的 goroutine 队列

recvq waitq // list of recv waiters

// 等待发送的 goroutine 队列

sendq waitq // list of send waiters

// 保护 hchan 中所有字段

lock mutex

}

关于字段的含义都写在注释里了,再来重点说几个字段:

buf 指向底层循环数组,只有缓冲型的 channel 才有。

sendx,recvx 均指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。

sendq,recvq 分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。

waitq 是 sudog 的一个双向链表,而 sudog 实际上是对 goroutine 的一个封装:

type waitq struct {

first *sudog

last *sudog

}

lock 用来保证每个读 channel 或写 channel 的操作都是原子的。

例如,创建一个容量为 6 的,元素为 int 型的 channel 数据结构如下 :

创建

我们知道,通道有两个方向,发送和接收。理论上来说,我们可以创建一个只发送或只接收的通道,但是这种通道创建出来后,怎么使用呢?一个只能发的通道,怎么接收呢?同样,一个只能收的通道,如何向其发送数据呢?

一般而言,使用 make 创建一个能收能发的通道:

// 无缓冲通道

ch1 := make(chan int)

// 有缓冲通道

ch2 := make(chan int, 10)

通过汇编分析,我们知道,最终创建 chan 的函数是 makechan:

func makechan(t *chantype, size int64) *hchan

从函数原型来看,创建的 chan 是一个指针。所以我们能在函数间直接传递 channel,而不用传递 channel 的指针。

具体来看下代码:

新建一个 chan 后,内存在堆上分配,大概长这样:

说明一下,这张图来源于 Gopher Con 上的一份 PPT,地址见参考资料。这份材料非常清晰易懂,推荐你去读。

接下来,我们用一个来自参考资料【深入 channel 底层】的例子来理解创建、发送、接收的整个过程。

首先创建了一个无缓冲的 channel,接着启动两个 goroutine,并将前面创建的 channel 传递进去。然后,向这个 channel 中发送数据 3,最后 sleep 1 秒后程序退出。

程序第 14 行创建了一个非缓冲型的 channel,我们只看 chan 结构体中的一些重要字段,来从整体层面看一下 chan 的状态,一开始什么都没有:

接收

在继续分析前面小节的例子前,我们先来看一下接收相关的源码。在清楚了接收的具体过程之后,也就能轻松理解具体的例子了。

接收操作有两种写法,一种带 "ok",反应 channel 是否关闭;一种不带 "ok",这种写法,当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值。两种写法,都有各自的应用场景。

经过编译器的处理后,这两种写法最后对应源码里的这两个函数:

// entry points for <- c from compiled code

func chanrecv1(c *hchan, elem unsafe.Pointer) {

chanrecv(c, elem, true)

}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {

_, received = chanrecv(c, elem, true)

return

}

chanrecv1 函数处理不带 "ok" 的情形,chanrecv2 则通过返回 "received" 这个字段来反应 channel 是否被关闭。接收值则比较特殊,会“放到”参数 elem 所指向的地址了,这很像 C/C++ 里的写法。如果代码里忽略了接收值,这里的 elem 为 nil。

无论如何,最终转向了 chanrecv 函数:

上面的代码注释地比较详细了,你可以对着源码一行行地去看,我们再来详细看一下。

如果 channel 是一个空值(nil),在非阻塞模式下,会直接返回。在阻塞模式下,会调用 gopark 函数挂起 goroutine,这个会一直阻塞下去。因为在 channel 是 nil 的情况下,要想不阻塞,只有关闭它,但关闭一个 nil 的 channel 又会发生 panic,所以没有机会被唤醒了。更详细地可以在 closechan 函数的时候再看。

和发送函数一样,接下来搞了一个在非阻塞模式下,不用获取锁,快速检测到失败并且返回的操作。顺带插一句,我们平时在写代码的时候,找到一些边界条件,快速返回,能让代码逻辑更清晰,因为接下来的正常情况就比较少,更聚焦了,看代码的人也更能专注地看核心代码逻辑了。

// 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回 (false, false)

if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||

c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&

atomic.Load(&c.closed) == 0 {

return

}

当我们观察到 channel 没准备好接收:

非缓冲型,等待发送列队里没有 goroutine 在等待

缓冲型,但 buf 里没有元素

之后,又观察到 closed == 0,即 channel 未关闭。

因为 channel 不可能被重复打开,所以前一个观测的时候, channel 也是未关闭的,因此在这种情况下可以直接宣布接收失败,快速返回。因为没被选中,也没接收到数据,所以返回值为 (false, false)。

接下来的操作,首先会上一把锁,粒度比较大。如果 channel 已关闭,并且循环数组 buf 里没有元素。对应非缓冲型关闭和缓冲型关闭但 buf 无元素的情况,返回对应类型的零值,但 received 标识是 false,告诉调用者此 channel 已关闭,你取出来的值并不是正常由发送者发送过来的数据。但是如果处于 select 语境下,这种情况是被选中了的。很多将 channel 用作通知信号的场景就是命中了这里。

接下来,如果有等待发送的队列,说明 channel 已经满了,要么是非缓冲型的 channel,要么是缓冲型的 channel,但 buf 满了。这两种情况下都可以正常接收数据。

于是,调用 recv 函数:

如果是非缓冲型的,就直接从发送者的栈拷贝到接收者的栈。

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {

// dst is on our stack or the heap, src is on another stack.

src := sg.elem

typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)

memmove(dst, src, t.size)

}

否则,就是缓冲型 channel,而 buf 又满了的情形。说明发送游标和接收游标重合了,因此需要先找到接收游标:

// chanbuf(c, i) is pointer to the i'th slot in the buffer.

func chanbuf(c *hchan, i uint) unsafe.Pointer {

return add(c.buf, uintptr(i)*uintptr(c.elemsize))

}

将该处的元素拷贝到接收地址。然后将发送者待发送的数据拷贝到接收游标处。这样就完成了接收数据和发送数据的操作。接着,分别将发送游标和接收游标向前进一,如果发生“环绕”,再从 0 开始。

最后,取出 sudog 里的 goroutine,调用 goready 将其状态改成 “runnable”,待发送者被唤醒,等待调度器的调度。

然后,如果 channel 的 buf 里还有数据,说明可以比较正常地接收。注意,这里,即使是在 channel 已经关闭的情况下,也是可以走到这里的。这一步比较简单,正常地将 buf 里接收游标处的数据拷贝到接收数据的地址。

到了最后一步,走到这里来的情形是要阻塞的。当然,如果 block 传进来的值是 false,那就不阻塞,直接返回就好了。

先构造一个 sudog,接着就是保存各种值了。注意,这里会将接收数据的地址存储到了 elem 字段,当被唤醒时,接收到的数据就会保存到这个字段指向的地址。然后将 sudog 添加到 channel 的 recvq 队列里。调用 goparkunlock 函数将 goroutine 挂起。

接下来的代码就是 goroutine 被唤醒后的各种收尾工作了。

我们继续之前的例子。前面说到第 14 行,创建了一个非缓冲型的 channel,接着,第 15、16 行分别创建了一个 goroutine,各自执行了一个接收操作。通过前面的源码分析,我们知道,这两个 goroutine (后面称为 G1 和 G2 好了)都会被阻塞在接收操作。G1 和 G2 会挂在 channel 的 recq 队列中,形成一个双向循环链表。

在程序的 17 行之前,chan 的整体数据结构如下:

buf 指向一个长度为 0 的数组,qcount 为 0,表示 channel 中没有元素。重点关注 recvq 和 sendq,它们是 waitq 结构体,而 waitq 实际上就是一个双向链表,链表的元素是 sudog,里面包含 g 字段,g 表示一个 goroutine,所以 sudog 可以看成一个 goroutine。recvq 存储那些尝试读取 channel 但被阻塞的 goroutine,sendq 则存储那些尝试写入 channel,但被阻塞的 goroutine。

此时,我们可以看到,recvq 里挂了两个 goroutine,也就是前面启动的 G1 和 G2。因为没有 goroutine 接收,而 channel 又是无缓冲类型,所以 G1 和 G2 被阻塞。sendq 没有被阻塞的 goroutine。

recvq 的数据结构如下。这里直接引用文章中的一幅图,用了三维元素,画得很好:

再从整体上来看一下 chan 此时的状态:

G1 和 G2 被挂起了,状态是 WAITING。关于 goroutine 调度器这块不是今天的重点,当然后面肯定会写相关的文章。这里先简单说下,goroutine 是用户态的协程,由 Go runtime 进行管理,作为对比,内核线程由 OS 进行管理。Goroutine 更轻量,因此我们可以轻松创建数万 goroutine。

一个内核线程可以管理多个 goroutine,当其中一个 goroutine 阻塞时,内核线程可以调度其他的 goroutine 来运行,内核线程本身不会阻塞。这就是通常我们说的 M:N 模型:

M:N 模型通常由三部分构成:M、P、G。M 是内核线程,负责运行 goroutine;P 是 context,保存 goroutine 运行所需要的上下文,它还维护了可运行(runnable)的 goroutine 列表;G 则是待运行的 goroutine。M 和 P 是 G 运行的基础。