3.2 通过channel通信来共享内存 #
- goroutine是Go程序并发的执行体,****channel(/ˈtʃænl/,通道**)**是它们之间的连接。
- 通道是可以让一个goroutine发送特定值到另一个goroutine的通信机制。每一个通道是一个具体类型的导管,叫作通道的元素类型。如一个有int类型元素的通道写为chan int;
- **像map一样,通道是一个使用内置的make函数创建的数据结构的引用(**pointer、slice、map、function、channel****为引用类型)。
- 当复制或者作为参数传递到一个函数时,复制的是引用,这样调用者和被调用者都引用同一份数据结构;
- 和其他引用类型一样,通道的零值是nil;
- 可比较性:同种类型的通道可以使用==符号进行比较:**当二者都是同一通道数据的引用时,比较值为true;**通道也可以和nil进行比较;
1 2 3
ch := make(chan int) // ch的类型: 'chan int' , unbuffered channel(无缓冲通道) ch = make(chan int, 0) // unbuffered channel ch = make(chan int, 3) // 通道容量3的缓冲通道
- 通道有三个主要操作,都使用
<-
运算符,send、receive统称为通信:- 发送(send)语句:
ch <- x
,从一个goroutine传输一个值到另一个在执行接收表达式的goroutine; - 接收(receive)语句:
x = <- ch
;<-ch
一个不使用接收结果的接收操作也是合法的; - 关闭(close):
close(ch)
设置一个标志位来指示值当前已经发送完毕,这个通道后面没有值了;- 关闭后的再次send发送操作将导致panic宕机;
- 在一个已经关闭的通道上进行receive接收操作,将获取所有已经发送的值,直到通道为空;这时任何接收操作会立即完成,同时获取到一个通道元素类型对应的零值;
- 发送(send)语句:
1. 无缓冲通道/同步通道 #
- 无缓冲通道上的发送操作将会阻塞,直到另一个goroutine在对应的通道上执行完接收操作,这时值传送完成,两个goroutine都可以恢复继续执行;
- 如果接收操作先执行,接收方goroutine将阻塞,直到另一个goroutine在同一个通道上发送一个值;
- 使用无缓冲通道进行的通信导致发送和接收goroutine同步化(同步通道)。当一个值在无缓冲通道上传递时,接收值后发送方goroutine才被再次唤醒;
- ***happens before:***Go语言并发内存模型的一个关键术语
- 在讨论并发的时候,当我们说x早于y发生时,不仅仅是说x发生的时间早于y,而是说确定它是这样,并且是可预期的,如更新变量;我们可以放心的依赖这个机制;
- 当x既不比y早也不比y晚时,我们说x和y并发。这不意味着,x和y一定同时发生,只说明我们无法确定它们的顺序。(费曼学习法:量子不确定性);
- 通过通道发送消息有两个重要的方面需要考虑:每一条消息有一个值,但有时候也强调通信本身、通信发生的时间,此时通常把消息叫作事件(event)(费曼学习法:OpenAPI WebHook来订单的事件)。当事件没有携带额外的信息时,它单纯的目的是进行同步。我们通过使用一个
struct{}
元素类型的通道来强调它,尽管通常使用bool或int类型的通道来做相同的事情,因为done<-1
比done<-struct{}{}
要短; - 示例:netcat3.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
func main() { conn, err := net.Dial("tcp", "localhost:8000") if err != nil { log.Fatal(err) } done := make(chan struct{}) go func() { // go语句调用了一个函数字面量(匿名函数),这是Go语言中启动goroutine常用的形式 io.Copy(os.Stdout, conn) // NOTE: ignoring errors log.Println("done") done <- struct{}{} // signal the main goroutine }() mustCopy(conn, os.Stdin) conn.Close() <-done // wait for background goroutine to finish }
管道/Pipeline #
管道(pipeline):通道可以用来连接goroutine,上一个通道的输出是下一个通道的输入;
- 第一个goroutine是counter,产生一个0, 1, 2,…的整数序列;然后通过一个管道发送给第二个goroutine(叫square),计算数值的平方;然后将结果通过另一个通道发送给第三个goroutine(叫printer),接收值并输出它们。
- 像这样的管道出现在长期运行的服务器程序中,其中通道用于在包含无限循环的goroutine之间整个生命周期中的通信。
- 没有一个直接的方式来判断是否通道已经关闭,但是这里有接收操作的一个变种,它产生两个结果:接收到的通道元素,以及一个布尔值(通常称为ok),它为true的时候代表接收成功,false表示当前的接收操作在一个关闭的并且读完的通道上。
- 第一个goroutine是counter,产生一个0, 1, 2,…的整数序列;然后通过一个管道发送给第二个goroutine(叫square),计算数值的平方;然后将结果通过另一个通道发送给第三个goroutine(叫printer),接收值并输出它们。
示例:pipeline1.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
func main() { naturals := make(chan int) squares := make(chan int) go func() { // goroutine1:计数器Counter for x := 0; ; x++ { naturals <- x } }() go func() { // goroutine2:求平方Squarer for { x := <-naturals squares <- x * x } }() for { // main goroutine:打印Printer (in main goroutine) fmt.Println(<-squares) } }
示例:pipeline2.go
- 因为上面的for死循环语法比较笨拙,而模式又比较通用,所以Go提供了range循环语法以在通道上迭代。这个语法更方便接收在通道上所有发送的值,接收完最后一个值后关闭循环。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
func main() { naturals := make(chan int) squares := make(chan int) // Counter go func() { for x := 0; x < 100; x++ { naturals <- x } close(naturals) }() go func() { // Squarer for { x, ok := <-naturals // channel被关闭并且没有值可接收时跳出循环 if !ok { break // channel was closed and drained } squares <- x * x } close(squares) }() go func() { for x := range naturals { // squares <- x * x } close(squares) }() // Printer (in main goroutine) for x := range squares { fmt.Println(x) } }
- 因为上面的for死循环语法比较笨拙,而模式又比较通用,所以Go提供了range循环语法以在通道上迭代。这个语法更方便接收在通道上所有发送的值,接收完最后一个值后关闭循环。
通道的close:
- 关闭每一个通道不是必需的,close操作只用于 断言/通知接收方goroutine 不再向channel发送新的数据**,所以仅仅在发送方的goroutine上才能调用close函数;**因此close一个只接收的channel将是一个编译错误;
- 通道也是可以通过GC垃圾回收器在它没有被引用时回收它(而不是根据它是否关闭);(**不要将这个close操作和对于文件的close操作混淆,**当结束的时候对每一个文件调Close方法是非常重要的)
- 试图关闭一个已经关闭的通道会导致panic宕机,就像关闭一个nil值的空通道也会导致panic宕机。关闭通道还可以作为一个广播机制;
单向通道类型 #
- 当一个channel作为一个函数参数时,它一般总是被专门用于只发送或者只接收。为了防止被滥用,Go语言的类型系统提供了单方向的channel类型,分别用于只发送或只接收的channel。
chan<- int
类型:表示一个只发送int的channel,只能发送不能接收。<-chan int
类型:表示一个只接收int的channel,只能接收不能发送。(箭头<-
和关键字chan的相对位置表明了channel的方向。)这种限制将在编译期检测。
- 任何双向通道向单向通道变量的赋值操作都将导致该隐式转换,但反过来是不行的,一旦有一个像
chan<- int
这样的单向通道,是没有办法通过它获取到引用同一个数据结构的chan int
数据类型的1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
func counter(out chan<- int) { for x := 0; x < 100; x++ { out <- x } close(out) } func squarer(out chan<- int, in <-chan int) { for v := range in { out <- v * v } close(out) } func printer(in <-chan int) { for v := range in { fmt.Println(v) } } func main() { naturals := make(chan int) squares := make(chan int) go counter(naturals) // naturals的类型将隐式地从chan int转换成chan<- int。 go squarer(squares, naturals) printer(squares) // 类型将隐式地转换为<-chan int类型只接收型的channel }
2. 缓冲通道 #
缓冲通道有一个元素队列,队列的最大长度在创建的时候通过make的容量参数来设置:
ch = make(chan string, 3)
创建一个可以容纳三个字符串的缓冲通道,并指向它的引用;- 缓冲通道上的发送操作在队列的尾部插入一个元素;如果通道满了,发送操作会阻塞所在的goroutine直到另一个goroutine对它进行接收操作来留出可用的空间 (降级为无缓冲通道/同步通道?);
- 接收操作从队列的头部移除一个元素;如果通道是空的,执行接收操作的goroutine阻塞,直到另一个goroutine在通道上发送数据;
- 通过这个方式,通道的缓冲区解耦了发送goroutine和接收goroutine;
在某些特殊情况下,程序需要知道通道缓冲区存的容量和元素个数:
- 通道缓冲区的容量:
fmt.Println(cap(ch))
- 通道缓冲区的有效元素个数:
fmt.Println(len(ch))
; 在并发程序中元素个数会随着接收操作而立即失效,但是它对某些故障诊断和性能优化会有帮助;
- 通道缓冲区的容量:
因为语法简单,Go新手粗暴地将缓冲通道作为队列在单个goroutine中使用,但是这是个严重错误。**通道和goroutine的调度深度关联,如果没有另一个goroutine从通道进行接收,发送者(也许是整个程序)有被永久阻塞的风险。**如果仅仅需要一个简单的队列,使用slice创建一个就可以;
示例:goroutines泄漏**(费曼学习法:类似内存泄漏):**如果使用一个无缓冲通道,两个比较慢的goroutine由于发送响应结果到通道的时候没有goroutine来接收而将被永远卡住的bug;
- 和回收变量不同,泄漏的goroutines不会自动回收,因此必须确保每个goroutine在不再需要的时候可以自动结束;
1 2 3 4 5 6 7 8
func mirroredQuery() string { responses := make(chan string, 3) go func() { responses <- request("asia.gopl.io") }() // 并发地向三个镜像站点发出请求,三个镜像站点分散在不同的地理位置,它们分别将收到的响应发送到带缓存channel go func() { responses <- request("europe.gopl.io") }() go func() { responses <- request("americas.gopl.io") }() return <-responses // return the quickest response // 最后接收者只接收第一个收到的(最快的)响应,mirroredQuery函数可能在另外两个响应慢的镜像站点响应之前就返回了结果。(顺便说一下,多个goroutines并发地向同一个channel发送数据,或从同一个channel接收数据都是常见的用法。) } func request(hostname string) (response string) { /* ... */ }
无缓冲通道和缓冲通道的选择、缓冲通道容量大小的选择,都会对程序的正确性产生影响。
- 无缓冲通道提供强同步保障,因为每一次发送都需要和一次对应的接收同步;
- 对于缓冲通道,这些操作则是解耦的;如果我们知道要发送的值数量的上限,通常会创建一个容量是使用上限的缓冲通道,在接收第一个值前就完成所有的发送。在内存无法提供缓冲容量的情况下,可能导致程序死锁。
- 通道的缓冲也可能影响程序的性能:组装流水线是对于通道和goroutine合适的比喻
- 想象蛋糕店里的三个厨师,在生产线上,在把每一个蛋糕传递给下一个厨师之前,一个烤,一个加糖衣,一个雕刻。在空间比较小的厨房,每一个厨师完成一个蛋糕流程,必须等待下一个厨师准备好接受它;这个场景类似于使用无缓冲通道来通信。
- 如果在厨师之间有可以放一个蛋糕的位置,一个厨师可以将制作好的蛋糕放到这里,然后立即开始制作下一个,这类似于使用一个容量1的缓冲通道。只要厨师们以相同的速度工作,大多数工作就可以快速处理,消除他们各自之间的速率差异。
- 如果在厨师之间有更多的空间——更长的缓冲区——就可以消除更大的暂态速率波动而不影响组装流水线,比如当一个厨师稍作休息时,后面再抓紧跟上进度。
- 另一方面,如果生产线的上游持续比下游快,缓冲区满的时间占大多数。如果后续的流程更快,缓冲区通常是空的。这时缓冲区的存在是没有价值的。
- 如果第二段更加复杂,一个厨师可能跟不上第一个厨师的供应,或者跟不上第三个厨师的需求。为了解决这个问题,我们可以雇用另一个厨师来帮助第二段流程,独立地执行同样的任务。这个类似于创建另外一个goroutine使用同一个通道来通信。