3.2 数据竞争

3.2 数据竞争 #

通过共享内存来通信、数据竞争、互斥锁、读写互斥锁、内存同步

  1. 串行与并行程序的区别
  • 串行程序(单goroutine):步骤由程序逻辑决定执行顺序
  • 并行程序(多goroutine):各goroutine内部顺序执行,但goroutine间事件顺序不确定(量子不确定性)
  1. 并发安全(concurrency-safe)函数/类型
  • 定义:在没有额外同步机制的情况下,多个goroutine同时调用仍能正确工作的函数
  • 并发安全类型:所有可访问方法和操作都是并发安全的类型(这是特例而非常态)
  1. 并发问题与竞争条件
  • 主要包括:死锁(deadlock)、活锁(livelock)和资源耗尽(resource starvation)、竞争条件等

  • 竞争条件的定义:程序在多个goroutine按某些交错顺序执行时程序无法给出正确的结果

  • 数据竞争的定义:在两个以上的goroutine并发访问相同的变量且至少其中一个为写操作时发生

  • 竞争条件是最危险的,因为:

    • 难以重现和分析
    • 可能仅在特定条件下出现
  • 如果数据竞争的对象是一个比一个机器字(译注:32位机器上一个字=4个字节)更大的类型时更加噩梦,如interface、string、slice类型

    • 示例:如并发地更新两个不同长度的slice
      1
      2
      3
      4
      
      var x []int
      go func() { x = make([]int, 10) }()
      go func() { x = make([]int, 1000000) }()
      x[999999] = 1 // NOTE: undefined behavior; memory corruption possible!
      
    • 最后一个语句中的x的值是未定义的;其可能是nil,或者也可能是一个长度为10的slice,也可能是一个长度为1,000,000的slice。但是回忆一下slice的三个组成部分:指针(pointer)、长度(length)和容量(capacity)。如果指针是从第一个make调用来,而长度从第二个make来,x就变成了一个混合体,一个自称长度为1,000,000但实际上内部只有10个元素的slice。这样导致的结果是存储999,999元素的位置会碰撞一个遥远的内存位置,这种情况下难以对值进行预测,而且debug也会变成噩梦。这种语义雷区被称为未定义行为,对C程序员来说应该很熟悉;幸运的是在Go语言里造成的麻烦要比C里小得多。
    • 尽管并发程序的概念让我们知道并发并不是简单的语句交叉执行。甚至一些非常聪明的程序员也还是会偶尔提出一些理由来允许数据竞争,比如:“互斥条件代价太高”,“这个逻辑只是用来做logging”,“我不介意丢失一些消息”等等。因为在他们的编译器或者平台上很少遇到问题,可能给了他们错误的信心。一个好的经验法则是根本就没有什么所谓的良性数据竞争。所以我们一定要避免数据竞争,那么在我们的程序中要如何做到呢?
  • 示例:并发存款导致Bob存的100元丢了的问题

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    // Package bank implements a bank with only one account.
    package bank
    var balance int
    func Deposit(amount int) { balance = balance + amount }
    func Balance() int { return balance }
    // Alice:
    go func() {
        bank.Deposit(200)                // A1
        fmt.Println("=", bank.Balance()) // A2
    }()
    // Bob:
    go bank.Deposit(100)                 // B
    
    • 情况一:Alice先执行:最终的银行总余额balance=300
    • 情况二:Bob先执行:最终的银行总余额balance=300
    • 情况三:交错执行:最终的银行总余额balance=300
    • 情况四:数据竞争(Data race):只要有两个goroutine并发访问同一变量,且至少其中的一个是写操作时,就会发生数据竞争
      • Alice执行Deposit(200)的read:balance + amount = 0 +200 = 200
      • Bob执行存款Deposit(100)的read+write:100
      • Alice执行Deposit(200)的write:balance=200,共享变量balance=200(Bob存的100元丢了)

避免数据竞争的方法 #

方法一:不要去写变量(提前初始化,然后只读) #

1
2
3
4
5
6
7
8
9
var icons = map[string]image.Image{
    "spades.png":   loadIcon("spades.png"),
    "hearts.png":   loadIcon("hearts.png"),
    "diamonds.png": loadIcon("diamonds.png"),
    "clubs.png":    loadIcon("clubs.png"),
}

// Concurrency-safe.
func Icon(name string) image.Image { return icons[name] }

方法二:通过channel限制在单个goroutine中访问变量 #

  • 如前面的并发web爬虫的main goroutine是唯一一个能够访问seen map的goroutine
  • 由于其它的goroutine不能够直接访问变量,只能使用一个channel来发送请求给指定的goroutine来查询更新变量。这也就是Go的口头禅“不要使用共享数据来通信;而是通过channel通信来共享数据。”(“Do not communicate by sharing memory; instead, share memory by communicating.”)
  • 一个提供对一个指定的变量通过channel来请求的goroutine叫做这个变量的monitor(监控)goroutine。例如broadcaster goroutine会监控clients map的全部访问
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Package bank provides a concurrency-safe bank with one account.
package bank

var deposits = make(chan int) // send amount to deposit
var balances = make(chan int) // receive balance

func Deposit(amount int) { deposits <- amount }
func Balance() int       { return <-balances }

func teller() {
    var balance int // balance is confined to teller goroutine
    for {
        select {
        case amount := <-deposits:
            balance += amount
        case balances <- balance:
        }
    }
}

func init() {
    go teller() // monitor goroutine  
}
  • 串行绑定:
    • 即使当一个变量无法在其整个生命周期内被绑定到一个独立的goroutine,绑定依然是并发问题的一个解决方案。
    • 如在一条流水线上的goroutine之间共享变量是很普遍的行为,在这两者间会通过channel来传输地址信息。如果流水线的每一个阶段都能够避免在将变量传送到下一阶段后再去访问它,那么对这个变量的所有访问就是线性的。其效果是变量会被绑定到流水线的一个阶段,传送完之后被绑定到下一个,以此类推。
  • 串行绑定示例:Cakes会被严格地顺序访问,先是baker gorouine,然后是icer gorouine:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
    type Cake struct{ state string }
    func baker(cooked chan<- *Cake) {
        for {
            cake := new(Cake)
            cake.state = "cooked"
            cooked <- cake // baker never touches this cake again
        }
    }
    func icer(iced chan<- *Cake, cooked <-chan *Cake) {
        for cake := range cooked {
            cake.state = "iced"
            iced <- cake // icer never touches this cake again
        }
    }
    

方法三:加互斥锁,同一个时刻只允许一个goroutine访问变量 #

  • 二元信号量(binary semaphore): 一个只能为1和0的信号量。这种互斥很实用,而且被sync包里的Mutex类型直接支持。它的Lock方法能够获取到token(这里叫锁),并且Unlock方法会释放这个token

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
    var (
        sema    = make(chan struct{}, 1) // a binary semaphore guarding balance
        balance int
    )
    func Deposit(amount int) {
        sema <- struct{}{} // acquire token
        balance = balance + amount
        <-sema // release token
    }
    func Balance() int {
        sema <- struct{}{} // acquire token
        b := balance
        <-sema // release token
        return b
    }
    
  • bank程序:

    • mu.Lock(): 如果其它的goroutine已经获得了这个锁的话,这个操作会被阻塞直到其它goroutine调用了Unlock使该锁变回可用状态
    • 临界区:**在Lock和Unlock之间的代码段,**goroutine可以随便读取或者修改。锁的持有者在其他goroutine获取该锁之前需要调用Unlock。goroutine在结束后释放锁是必要的,无论以哪条路径通过函数都需要释放,即使是在错误路径中,也要记得释放。
    • 例证了一种通用的并发模式:一系列的导出函数封装了一个或多个变量,那么访问这些变量唯一的方式就是通过这些函数来做(或者方法,对于一个对象的变量来说)。每一个函数在一开始就获取互斥锁并在最后释放锁,从而保证共享变量不会被并发访问。这种函数、互斥锁和变量的编排叫作监控monitor(这种老式单词的monitor是受“monitor goroutine”的术语启发而来的。两种用法都是一个代理人保证变量被顺序访问)。
    • defer调用只会比显式地调用Unlock成本高那么一点点,不过却在很大程度上保证了代码的整洁性。大多数情况下对于并发程序来说,代码的整洁性比过度的优化更重要。如果可能的话尽量使用defer来将临界区扩展到函数的结束。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
    import "sync"
    var (
        mu      sync.Mutex // guards balance
        balance int   // 按**惯例,被mutex所保护的变量跟在mutex变量声明之后声明**
    )
    func Deposit(amount int) {
        mu.Lock()
        balance = balance + amount
        mu.Unlock()
    }
    func Balance() int {
        mu.Lock()   // 获取一个互斥锁
        defer mu.Unlock() 
        return balance
    }
    
  • Go语言的互斥量是不可再入的,Go中没有重入锁(参考Java重入锁):没法对一个已经锁上的mutex来再次上锁,这会导致程序死锁一直被卡住;mutex不能重入的原因:

    • mutex互斥量的目的是确保共享变量在程序执行时的关键点上能够保证不变性。不变性的一层含义是“没有goroutine访问共享变量”,但实际上这里对于mutex保护的变量来说,不变性还包含更深层含义:当一个goroutine获得了一个互斥锁时,它能断定被互斥锁保护的变量正处于不变状态(译注:即没有其他代码块正在读写共享变量),在其获取并保持锁期间,可能会去更新共享变量,这样不变性只是短暂地被破坏,然而当其释放锁之后,锁必须保证共享变量重获不变性并且多个goroutine按顺序访问共享变量。尽管一个可以重入的mutex也可以保证没有其它的goroutine在访问共享变量,但它不具备不变性更深层含义。(译注: 更详细的解释,Russ Cox认为可重入锁是bug的温床,是一个失败的设计)
  • Go中互斥量(mutex)不可重入的设计原因,可以总结为以下几点:

    • 互斥量的核心目的是确保共享变量在关键点上的不变性,不变性包含两层含义:
      • 基本含义:确保没有其他goroutine同时访问共享变量
      • 深层含义:
        • 获得锁时,变量处于有效的不变状态
        • 持有锁期间可以临时破坏不变性
        • 释放锁前必须恢复变量的不变性
    • 虽然可重入锁也能防止并发访问,但它无法保证这种更深层的不变性保证,因此Go语言设计者认为可重入锁容易导致bug
  • 一个通用的解决方案是将Deposit拆分成两个:

    • 一个不导出的函数deposit,这个函数假设锁总是会被保持并去做实际的操作
    • 另一个是导出的函数Deposit,这个函数会调用deposit,但在调用前会先去获取锁
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      
      // This function requires that the lock be held.
      func deposit(amount int) { balance += amount }  // 存款
      func Deposit(amount int) {
          mu.Lock()
          defer mu.Unlock()
          deposit(amount)
      }
      func Withdraw(amount int) bool {  // 取款
          mu.Lock()
          defer mu.Unlock()
          deposit(-amount)
          if balance < 0 {
              deposit(amount)
              return false // insufficient funds
          }
          return true
      }
      func Balance() int {
          mu.Lock()
          defer mu.Unlock()
          return balance
      }
      
  • 封装即通过在程序中减少对数据结构的非预期交互,来帮助我们保证数据结构中的不变量。因为类似的原因,封装也可以用来保持并发中的不变性。所以无论是为了保护包级别的变量,还是结构中的字段,当你使用一个互斥量时,都请确保互斥量本身以及被保护的变量都没有导出。(即小写字母开头,且不要被大写字母开头的函数访问)

读写互斥锁:sync.RWMutex #

  • “多读单写”锁(multiple readers, single writer lock),Go语言提供的这样的锁是sync.RWMutex:允许多个只读操作并行执行,但写操作会完全互斥
    1
    2
    3
    4
    5
    6
    7
    
    var mu sync.RWMutex
    var balance int
    func Balance() int {
        mu.RLock() // readers lock
        defer mu.RUnlock()
        return balance
    }
    
  • RLock只能在临界区共享变量没有任何写入操作时可用。一般来说,我们不应该假设逻辑上的只读函数/方法也不会去更新某一些变量。如一个方法功能是访问一个变量,但它也有可能会同时去给一个内部的计数器+1(译注:可能是记录这个方法的访问次数啥的),或者去更新缓存——使即时的调用能够更快。如果有疑惑的话,请使用互斥锁
  • RWMutex只有当获得锁的大部分goroutine都是读操作,而锁在竞争条件下,也就是说,goroutine们必须等待才能获取到锁的时候,RWMutex才是最能带来好处的。RWMutex需要更复杂的内部记录,所以会让它比一般的无竞争锁的mutex慢一些。

内存同步 #

Balance()(即Java中的GetBalance()写法)作为读方法需要互斥锁的原因:

  • 原因1:防止Balance插到其他操作中间(如Withdraw())
  • 原因2:同步不仅涉及多个goroutine的执行顺序问题,同步还会影响到内存
    • 现代的计算机一般都会有多个CPU
    • 每个CPU都有内存的本地缓存,为了提高效率,数据会先写入CPU缓存,而不是直接写入主内存
    • 当使用channel或互斥锁等同步机制/同步原语时,CPU会被强制将缓存中的数据写回主内存,这样其他CPU上运行的goroutine就能看到这些更新的数据(很微妙,精彩!)
  • 示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    var x, y int
    go func() {
        x = 1 // A1
        fmt.Print("y:", y, " ") // A2
    }()
    go func() {
        y = 1                   // B1
        fmt.Print("x:", x, " ") // B2
    }()
    
    可能的并发问题如下,在某些特定的编译器、CPU或者其他情况下,最后两种确实可能发生:
    1
    2
    3
    4
    5
    6
    
    y:0 x:1
    x:0 y:1
    x:1 y:1
    y:1 x:1
    x:0 y:0
    y:0 x:0
    
    • 在单个goroutine内每个语句顺序执行,但在缺乏使用channel或者互斥量mutex来显式同步的情况下,就没法保证事件在不同的goroutine中看到的执行顺序是一致的了。
      • 尽管goroutine A中一定需要观察到x=1执行成功之后才会去读取y,但它没法确保自己观察得到goroutine B中对y的写入,所以A还可能会打印出y的一个旧版的值0。
    • 尽管去理解并发的一种尝试是去将其运行理解为不同goroutine语句的交错执行,但看看上面的例子,这已经不是现代的编译器和cpu的工作方式了。因为赋值和Print打印指向不同的变量,编译器可能会断定两条语句的顺序不会影响执行结果,并且会交换两个语句的执行顺序
      • 如果两个goroutine在不同的CPU上执行,每一个CPU有自己的缓存,那么一个goroutine的写入操作在同步到内存之前对另外一个goroutine的Print语句是不可见的(所以从内存读取到的还是初始值0。很微妙,精彩!)。
    • 尽管很容易把并发简单理解为多个goroutine中语句的某种交错执行方式,但正如上面的例子所显示的,这并不是一个现代编译器和CPU的工作方式。因为赋值和Print对应不同的变量,所以编译器就可能会认为两个语句的执行顺序不会影响结果,然后就交换了这两个语句的执行顺序。CPU也有类似的问题,如果两个goroutine在不同的CPU上执行,每个CPU都有自己的缓存,那么一个goroutine的写入操作
    • 这些并发问题都可以通过采用简单、成熟的模式来避免。所以可能的话,将变量限定在goroutine内部;如果是多个goroutine都需要访问的变量,使用互斥锁来访问。

sync.Once 惰性初始化/延迟初始化(lazy initialization) #

  • 延迟一个昂贵/高成本的初始化步骤到有实际需求的时刻是一个很好的实践:预先初始化一个变量会增加程序的启动延时,并且如果实际执行时有可能根本用不上这个变量,那么初始化也不是必需的。
  • 如果一个变量只被一个单独的goroutine所访问的话,我们可以使用厦门的这种模板
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    var icons map[string]image.Image
    func loadIcons() {
        icons = map[string]image.Image{
            "spades.png":   loadIcon("spades.png"),
            "hearts.png":   loadIcon("hearts.png"),
            "diamonds.png": loadIcon("diamonds.png"),
            "clubs.png":    loadIcon("clubs.png"),
        }
    }
    // NOTE: not concurrency-safe!
    func Icon(name string) image.Image {
        if icons == nil {
            loadIcons() // one-time initialization  
        }
        return icons[name]
    }
    
    • 但这种模板在Icon被并发调用时并不安全。就像前面银行的那个Deposit(存款)函数一样,Icon函数也是由多个步骤组成的:首先测试icons是否为空,然后load这些icons,之后将icons更新为一个非空的值。
    • 直觉会告诉我们最差的情况是loadIcons函数被多次访问会带来数据竞争。当第一个goroutine在忙着loading这些icons的时候,另一个goroutine进入了Icon函数,发现变量是nil,然后也会调用loadIcons函数。
    • 不过这种直觉是错误的。(我们希望你从现在开始能够构建自己对并发的直觉,也就是说对并发的直觉总是不能被信任的!如前面的内存同步)。
      • 因为缺少显式的同步,编译器和CPU是可以随意地去更改访问内存的指令顺序,以任意方式,只要保证每一个goroutine自己的执行顺序一致。
      • 其中一种可能loadIcons的语句重排是下面这样。它会在填写icons变量的值之前先用一个空map来初始化icons变量。
      • 因此,一个goroutine在检查icons是非空时,也并不能就假设这个变量的初始化流程已经走完了(译注:可能只是塞了个空map,里面的值还没填完,也就是说填值的语句都没执行完呢)。
      1
      2
      3
      4
      5
      6
      7
      
      func loadIcons() {
          icons = make(map[string]image.Image
          icons["spades.png"] = loadIcon("spades.png")
          icons["hearts.png"] = loadIcon("hearts.png")
          icons["diamonds.png"] = loadIcon("diamonds.png")
          icons["clubs.png"] = loadIcon("clubs.png")
      }
      
    • 最简单且正确的保证所有goroutine能够观察到loadIcons效果的方式,是用一个mutex来同步检查。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      
      var mu sync.Mutex // guards icons
      var icons map[string]image.Image
      // Concurrency-safe.
      func Icon(name string) image.Image {
          mu.Lock()
          defer mu.Unlock()
          if icons == nil {
              loadIcons()
          }
          return icons[name]
      }
      
    • 然而使用互斥访问icons的代价就是没有办法对该变量进行并发访问,即使变量已经被初始化完毕且再也不会进行变动。这里我们可以引入一个允许多读的锁:
      • 下面代码有两个临界区。goroutine首先会获取一个读锁,查询map,然后释放锁。如果条目被找到了(一般情况下),那么会直接返回。如果没有找到,那goroutine会获取一个写锁。不释放共享锁的话,也没有任何办法来将一个共享锁升级为一个互斥锁,所以我们必须重新检查icons变量是否为nil,以防止在执行这一段代码的时候,icons变量已经被其它gorouine初始化过了。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      
      var mu sync.RWMutex // guards icons
      var icons map[string]image.Image
      // Concurrency-safe.
      func Icon(name string) image.Image {
          mu.RLock()
          if icons != nil {
              icon := icons[name]
              mu.RUnlock()
              return icon
          }
          mu.RUnlock()
          // acquire an exclusive lock
          mu.Lock()
          if icons == nil { // NOTE: must recheck for nil
              loadIcons()
          }
          icon := icons[name]
          mu.Unlock()
          return icon
      }
      
    • 上面的模板使我们的程序能够更好的并发,但是有一点太复杂且容易出错。幸运的是,sync包为我们提供了一个专门的方案来解决这种一次性初始化的问题:sync.Once。概念上来讲,一次性的初始化需要一个互斥量mutex和一个boolean变量来记录初始化是不是已经完成了;互斥量用来保护boolean变量和客户端数据结构。Do这个唯一的方法需要接收初始化函数作为其参数。让我们用sync.Once来简化前面的Icon函数吧:
      • 每一次对Do(loadIcons)的调用都会锁定mutex,并会检查boolean变量(译注:Go1.9中会先判断boolean变量是否为1(true),只有不为1才锁定mutex,不再需要每次都锁定mutex)。
      • 在第一次调用时,boolean变量的值是false,Do会调用loadIcons并会将boolean变量设置为true。随后的调用什么都不会做,但是mutex同步会保证loadIcons对内存(这里其实就是指icons变量啦)产生的效果能够对所有goroutine可见。用这种方式来使用sync.Once的话,我们能够避免在变量被构建完成之前和其它goroutine共享该变量。
      1
      2
      3
      4
      5
      6
      7
      
      var loadIconsOnce sync.Once
      var icons map[string]image.Image
      // Concurrency-safe.
      func Icon(name string) image.Image {
          loadIconsOnce.Do(loadIcons)
          return icons[name]
      }
      

竞态检测器 #

  • 即使使以最大努力的仔细,仍然很容易在并发上犯错误。幸运的是,Go语言运行时runtime和工具链装备了一个精致并易于使用的动态分析工具:竞态检测器/竞争检查起(race detector)
  • 只要在go build,go run或者go test命令后面加上-race的参数,它会让编译器为你的应用或测试构建一个修改后的版本,并且会记录在执行时对共享变量的所有访问,以及读写这些变量的goroutine标识
  • 除此之外,修改后的版本还会记录所有的同步事件,包括go语句、channel操作、(*sync.Mutex).Lock调用、(*sync.WaitGroup).Wait调用等。(完整的同步事件集合可以在语言规范中的“The Go Memory Model”文档中找到。)
  • 竞态检测器会研究事件流,找到那些有问题的案例,即一个goroutine写入一个变量后,中间没有任何同步的操作,就有另外一个goroutine读写了该变量。这种案例表明有对共享变量的并发访问,即数据竞态。
  • 这个工具会输出一份报告,包括变量的标识以及读写goroutine当时的调用栈。通常情况下这些信息足以定位问题了。在9.7节就有一个竞态检测器的示例。竞态检测器报告所有实际运行了的数据竞态。
  • 然而,它只能检测到那些在运行时发生的竞态,无法用来保证肯定不会发生竞态。为了获得最佳效果,请确保你的测试包含了并发使用包的场景由于存在额外的簿记工作,带竞态检测功能的程序在执行时需要更长的时间和更多的内存,但即使对于很多生产环境的任务,这种额外开支也是可以接受的。对于那些不常发生的竞态,使用竞态检测器可以帮你节省数小时甚至数天的调试时间。

示例: 并发的非阻塞缓存 #

本节中我们会做一个无阻塞的缓存,这种工具可以帮助我们来解决现实世界中并发程序出现但没有现成的库可以解决的问题。这个问题叫作缓存(memoizing)函数(译注:Memoization的定义: memoization 一词是Donald Michie 根据拉丁语memorandum杜撰的一个词。相应的动词、过去分词、ing形式有memoiz、memoized、memoizing),也就是说,我们需要缓存函数的返回结果,这样在对函数进行调用的时候,我们就只需要一次计算,之后只要返回计算的结果就可以了。我们的解决方案会是并发安全且会避免对整个缓存加锁而导致所有操作都去争一个锁的设计。

我们将使用下面的httpGetBody函数作为我们需要缓存的函数的一个样例。这个函数会去进行HTTP GET请求并且获取http响应body。对这个函数的调用本身开销是比较大的,所以我们尽量避免在不必要的时候反复调用。

1
2
3
4
5
6
7
8
func httpGetBody(url string) (interface{}, error) {
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    return ioutil.ReadAll(resp.Body)
}

最后一行稍微隐藏了一些细节。ReadAll会返回两个结果,一个[]byte数组和一个错误,不过这两个对象可以被赋值给httpGetBody的返回声明里的interface{}和error类型,所以我们也就可以这样返回结果并且不需要额外的工作了。我们在httpGetBody中选用这种返回类型是为了使其可以与缓存匹配。

下面是我们要设计的cache的第一个“草稿”:

gopl.io/ch9/memo1

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Package memo provides a concurrency-unsafe
// memoization of a function of type Func.
package memo

// A Memo caches the results of calling a Func.
type Memo struct {
    f     Func
    cache map[string]result
}

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

type result struct {
    value interface{}
    err   error
}

func New(f Func) *Memo {
    return &Memo{f: f, cache: make(map[string]result)}
}

// NOTE: not concurrency-safe!
func (memo *Memo) Get(key string) (interface{}, error) {
    res, ok := memo.cache[key]
    if !ok {
        res.value, res.err = memo.f(key)
        memo.cache[key] = res
    }
    return res.value, res.err
}

Memo实例会记录需要缓存的函数f(类型为Func),以及缓存内容(里面是一个string到result映射的map)。每一个result都是简单的函数返回的值对儿——一个值和一个错误值。继续下去我们会展示一些Memo的变种,不过所有的例子都会遵循上面的这些方面。

下面是一个使用Memo的例子。对于流入的URL的每一个元素我们都会调用Get,并打印调用延时以及其返回的数据大小的log:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
m := memo.New(httpGetBody)
for url := range incomingURLs() {
    start := time.Now()
    value, err := m.Get(url)
    if err != nil {
        log.Print(err)
    }
    fmt.Printf("%s, %s, %d bytes\n",
    url, time.Since(start), len(value.([]byte)))
}

我们可以使用测试包(第11章的主题)来系统地鉴定缓存的效果。从下面的测试输出,我们可以看到URL流包含了一些重复的情况,尽管我们第一次对每一个URL的(*Memo).Get的调用都会花上几百毫秒,但第二次就只需要花1毫秒就可以返回完整的数据了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
$ go test -v gopl.io/ch9/memo1
=== RUN   Test
https://golang.org, 175.026418ms, 7537 bytes
https://godoc.org, 172.686825ms, 6878 bytes
https://play.golang.org, 115.762377ms, 5767 bytes
http://gopl.io, 749.887242ms, 2856 bytes
https://golang.org, 721ns, 7537 bytes
https://godoc.org, 152ns, 6878 bytes
https://play.golang.org, 205ns, 5767 bytes
http://gopl.io, 326ns, 2856 bytes
--- PASS: Test (1.21s)
PASS
ok  gopl.io/ch9/memo1   1.257s

这个测试是顺序地去做所有的调用的。

由于这种彼此独立的HTTP请求可以很好地并发,我们可以把这个测试改成并发形式。可以使用sync.WaitGroup来等待所有的请求都完成之后再返回。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
m := memo.New(httpGetBody)
var n sync.WaitGroup
for url := range incomingURLs() {
    n.Add(1)
    go func(url string) {
        start := time.Now()
        value, err := m.Get(url)
        if err != nil {
            log.Print(err)
        }
        fmt.Printf("%s, %s, %d bytes\n",
        url, time.Since(start), len(value.([]byte)))
        n.Done()
    }(url)
}
n.Wait()

这次测试跑起来更快了,然而不幸的是貌似这个测试不是每次都能够正常工作。我们注意到有一些意料之外的cache miss(缓存未命中),或者命中了缓存但却返回了错误的值,或者甚至会直接崩溃。

但更糟糕的是,有时候这个程序还是能正确的运行(译:也就是最让人崩溃的偶发bug),所以我们甚至可能都不会意识到这个程序有bug。但是我们可以使用-race这个flag来运行程序,竞争检测器(§9.6)会打印像下面这样的报告:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
$ go test -run=TestConcurrent -race -v gopl.io/ch9/memo1
=== RUN   TestConcurrent
...
WARNING: DATA RACE
Write by goroutine 36:
  runtime.mapassign1()
      ~/go/src/runtime/hashmap.go:411 +0x0
  gopl.io/ch9/memo1.(*Memo).Get()
      ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
  ...
Previous write by goroutine 35:
  runtime.mapassign1()
      ~/go/src/runtime/hashmap.go:411 +0x0
  gopl.io/ch9/memo1.(*Memo).Get()
      ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
...
Found 1 data race(s)
FAIL    gopl.io/ch9/memo1   2.393s

memo.go的32行出现了两次,说明有两个goroutine在没有同步干预的情况下更新了cache map。这表明Get不是并发安全的,存在数据竞争。

1
2
3
4
5
6
7
8
28  func (memo *Memo) Get(key string) (interface{}, error) {
29      res, ok := memo.cache(key)
30      if !ok {
31          res.value, res.err = memo.f(key)
32          memo.cache[key] = res
33      }
34      return res.value, res.err
35  }

最简单的使cache并发安全的方式是使用基于监控的同步。只要给Memo加上一个mutex,在Get的一开始获取互斥锁,return的时候释放锁,就可以让cache的操作发生在临界区内了:

gopl.io/ch9/memo2

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type Memo struct {
    f     Func
    mu    sync.Mutex // guards cache
    cache map[string]result
}

// Get is concurrency-safe.
func (memo *Memo) Get(key string) (value interface{}, err error) {
    memo.mu.Lock()
    res, ok := memo.cache[key]
    if !ok {
        res.value, res.err = memo.f(key)
        memo.cache[key] = res
    }
    memo.mu.Unlock()
    return res.value, res.err
}

测试依然并发进行,但这回竞争检查器“沉默”了。不幸的是对于Memo的这一点改变使我们完全丧失了并发的性能优点。每次对f的调用期间都会持有锁,Get将本来可以并行运行的I/O操作串行化了。我们本章的目的是完成一个无锁缓存,而不是现在这样的将所有请求串行化的函数的缓存。

下一个Get的实现,调用Get的goroutine会两次获取锁:查找阶段获取一次,如果查找没有返回任何内容,那么进入更新阶段会再次获取。在这两次获取锁的中间阶段,其它goroutine可以随意使用cache。

gopl.io/ch9/memo3

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (memo *Memo) Get(key string) (value interface{}, err error) {
    memo.mu.Lock()
    res, ok := memo.cache[key]
    memo.mu.Unlock()
    if !ok {
        res.value, res.err = memo.f(key)

        // Between the two critical sections, several goroutines
        // may race to compute f(key) and update the map.
        memo.mu.Lock()
        memo.cache[key] = res
        memo.mu.Unlock()
    }
    return res.value, res.err
}

这些修改使性能再次得到了提升,但有一些URL被获取了两次。这种情况在两个以上的goroutine同一时刻调用Get来请求同样的URL时会发生。多个goroutine一起查询cache,发现没有值,然后一起调用f这个慢不拉叽的函数。在得到结果后,也都会去更新map。其中一个获得的结果会覆盖掉另一个的结果。

理想情况下是应该避免掉多余的工作的。而这种“避免”工作一般被称为duplicate suppression(重复抑制/避免)。下面版本的Memo每一个map元素都是指向一个条目的指针。每一个条目包含对函数f调用结果的内容缓存。与之前不同的是这次entry还包含了一个叫ready的channel。在条目的结果被设置之后,这个channel就会被关闭,以向其它goroutine广播(§8.9)去读取该条目内的结果是安全的了。

gopl.io/ch9/memo4

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type entry struct {
    res   result
    ready chan struct{} // closed when res is ready
}

func New(f Func) *Memo {
    return &Memo{f: f, cache: make(map[string]*entry)}
}

type Memo struct {
    f     Func
    mu    sync.Mutex // guards cache
    cache map[string]*entry
}

func (memo *Memo) Get(key string) (value interface{}, err error) {
    memo.mu.Lock()
    e := memo.cache[key]
    if e == nil {
        // This is the first request for this key.
        // This goroutine becomes responsible for computing
        // the value and broadcasting the ready condition.
        e = &entry{ready: make(chan struct{})}
        memo.cache[key] = e
        memo.mu.Unlock()

        e.res.value, e.res.err = memo.f(key)

        close(e.ready) // broadcast ready condition
    } else {
        // This is a repeat request for this key.
        memo.mu.Unlock()

        <-e.ready // wait for ready condition
    }
    return e.res.value, e.res.err
}

现在Get函数包括下面这些步骤了:获取互斥锁来保护共享变量cache map,查询map中是否存在指定条目,如果没有找到那么分配空间插入一个新条目,释放互斥锁。如果存在条目的话且其值没有写入完成(也就是有其它的goroutine在调用f这个慢函数)时,goroutine必须等待值ready之后才能读到条目的结果。而想知道是否ready的话,可以直接从ready channel中读取,由于这个读取操作在channel关闭之前一直是阻塞。

如果没有条目的话,需要向map中插入一个没有准备好的条目,当前正在调用的goroutine就需要负责调用慢函数、更新条目以及向其它所有goroutine广播条目已经ready可读的消息了。

条目中的e.res.value和e.res.err变量是在多个goroutine之间共享的。创建条目的goroutine同时也会设置条目的值,其它goroutine在收到"ready"的广播消息之后立刻会去读取条目的值。尽管会被多个goroutine同时访问,但却并不需要互斥锁。ready channel的关闭一定会发生在其它goroutine接收到广播事件之前,因此第一个goroutine对这些变量的写操作是一定发生在这些读操作之前的。不会发生数据竞争。

这样并发、不重复、无阻塞的cache就完成了。

上面这样Memo的实现使用了一个互斥量来保护多个goroutine调用Get时的共享map变量。不妨把这种设计和前面提到的把map变量限制在一个单独的monitor goroutine的方案做一些对比,后者在调用Get时需要发消息。

Func、result和entry的声明和之前保持一致:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

// A result is the result of calling a Func.
type result struct {
    value interface{}
    err   error
}

type entry struct {
    res   result
    ready chan struct{} // closed when res is ready
}

然而Memo类型现在包含了一个叫做requests的channel,Get的调用方用这个channel来和monitor goroutine来通信。requests channel中的元素类型是request。Get的调用方会把这个结构中的两组key都填充好,实际上用这两个变量来对函数进行缓存的。另一个叫response的channel会被拿来发送响应结果。这个channel只会传回一个单独的值。

gopl.io/ch9/memo5

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// A request is a message requesting that the Func be applied to key.
type request struct {
    key      string
    response chan<- result // the client wants a single result
}

type Memo struct{ requests chan request }
// New returns a memoization of f.  Clients must subsequently call Close.
func New(f Func) *Memo {
    memo := &Memo{requests: make(chan request)}
    go memo.server(f)
    return memo
}

func (memo *Memo) Get(key string) (interface{}, error) {
    response := make(chan result)
    memo.requests <- request{key, response}
    res := <-response
    return res.value, res.err
}

func (memo *Memo) Close() { close(memo.requests) }

上面的Get方法,会创建一个response channel,把它放进request结构中,然后发送给monitor goroutine,然后马上又会接收它。

cache变量被限制在了monitor goroutine ``(*Memo).server`中,下面会看到。monitor会在循环中一直读取请求,直到request channel被Close方法关闭。每一个请求都会去查询cache,如果没有找到条目的话,那么就会创建/插入一个新的条目。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (memo *Memo) server(f Func) {
    cache := make(map[string]*entry)
    for req := range memo.requests {
        e := cache[req.key]
        if e == nil {
            // This is the first request for this key.
            e = &entry{ready: make(chan struct{})}
            cache[req.key] = e
            go e.call(f, req.key) // call f(key)
        }
        go e.deliver(req.response)
    }
}

func (e *entry) call(f Func, key string) {
    // Evaluate the function.
    e.res.value, e.res.err = f(key)
    // Broadcast the ready condition.
    close(e.ready)
}

func (e *entry) deliver(response chan<- result) {
    // Wait for the ready condition.
    <-e.ready
    // Send the result to the client.
    response <- e.res
}

和基于互斥量的版本类似,第一个对某个key的请求需要负责去调用函数f并传入这个key,将结果存在条目里,并关闭ready channel来广播条目的ready消息。使用(*entry).call来完成上述工作。

紧接着对同一个key的请求会发现map中已经有了存在的条目,然后会等待结果变为ready,并将结果从response发送给客户端的goroutien。上述工作是用(*entry).deliver来完成的。对call和deliver方法的调用必须让它们在自己的goroutine中进行以确保monitor goroutines不会因此而被阻塞住而没法处理新的请求。

这个例子说明我们无论用上锁,还是通信来建立并发程序都是可行的。

上面的两种方案并不好说特定情境下哪种更好,不过了解他们还是有价值的。有时候从一种方式切换到另一种可以使你的代码更为简洁。(译注:不是说好的golang推崇通信并发么。)