背景
Go 不仅提供基于 channel 的 CSP 方式来进行 goroutine 间同步,也提供基于共享变量的同步方式,sync 包就提供了这些基础工具,本文简要介绍一下 sync 包的使用。
Once / WaitGroup / Pool 的使用
Once 的使用
定义
1
2
3
4
5
|
type Once struct {
// 非暴露字段
}
func (o *Once) Do(f func())
|
使用前先定义 Once 类型变量:
使用的时候向 Once 类型变量传入函数:
1
|
once.Do(func() { init() })
|
多次调用 once.Do(f)
只会触发一次 f
的执行,即第一次 f
的执行。
用法实例
某些操作只需要执行一次(比如一些初始化动作),这时就可使用 Once,如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func main() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
// 创建 10 个 goroutine,但是 onceBody 只会执行 1 次
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
// 等待 10 个 goroutine 结束
for i := 0; i < 10; i++ {
<-done
}
}
|
其实 Once 的实现非常简单,就是互斥锁+原子变量,待下文讲完 Mutex 后再回头看 Once 的实现。
WaitGroup 的使用
定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
type WaitGroup struct {
// 非暴露字段
}
// 全局维护一个 counter,如果 counter 等于 0,所以在 counter 上等待的 goroutine 将被释放
// delta 可以为负数,但是如果 counter < 0,将导致 panic
func (wg *WaitGroup) Add(delta int)
// 就是 counter 减 1 操作
func (wg *WaitGroup) Done()
// 阻塞等待直到 counter 为 0
func (wg *WaitGroup) Wait()
|
用法实例
WaitGroup 用来等待一组 goroutine 结束,操作流程为:
- 主 goroutine 调用
Add()
设置需要等待结束的 goroutine 的数量;
- 主 goroutine 调用
Wait()
开始阻塞直到所有 goroutine 运行结束;
- 每个 goroutine 开始运行并在结束的时候调用
Done()
;
如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func main() {
var wg sync.WaitGroup
var urls = []string{
"www.qq.com",
"www.baidu.com",
"www.taobao.com",
}
for _, url := range urls {
// 每次启动一个 goroutine 来做 http GET 动作,每启动一个就调用一次 Add()
wg.Add(1)
go func(url string) {
// 一般会将 Done() 放在 defer 中
defer wg.Done()
http.Get(url)
}(url)
}
// 主 goroutine 开始等待所有 goroutine 结束
wg.Wait()
}
|
Pool 的使用
定义
1
2
3
4
5
6
7
8
9
10
11
12
|
type Pool struct {
// 当 Get() 找不到一个对象时,会使用 New() 生成一个对象
New func() interface{}
// 剩下的是非暴露字段
}
// 任意从 pool 中挑选一个对象返回给客户端,如果找不到就使用 p.New 生成
func (p *pool) Get() interface{}
// 将对象 x 放回到 pool 中
func (p *pool) Put(x interface{})
|
用法实例
Pool 是一个临时对象保存(Put()
)和获取(Get()
)的集合,可安全地并发访问。
Pool 会将一些最近分配但还未使用的对象缓存起来供下次使用,以此来缓解 GC 压力。Pool 中的对象会自动在任何时间段中被移除而不会有任何显式的通知机制。
使用 Pool 可以用以管理一些临时对象供多个 package 的客户端使用,客户端对 Pool 的逻辑是无感知的:需要的时候 Get,不需要的时候 Put,而且 Pool 可根据当前负载自动调整对象池的大小。
一个典型的应用是日志,如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func Log(w io.Writer, key, val string) {
// 从对象池中获取 buffer
b := bufPool.Get().(*bytes.Buffer)
b.Reset()
b.WriteString(time.Now().Format(time.RFC3339))
b.WriteByte(' ')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(val)
w.Write(b.Bytes())
// 使用完毕,归还 buffer
bufPool.Put(b)
}
func main() {
Log(os.Stdout, "path", "/search?q=flowers")
}
|
一个简单缓存的设计(Mutex / RWMutex / Map 的使用)
一个基于 map 结构非安全缓存
如果我们想在程序中引入一个基于内存的简单的 cache,map 类型是其最佳选择。
一个具备 Get / Set 操作的简单的 cache 可如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
type Cache struct {
data map[interface{}]interface{}
}
func NewCache() *Cache {
return &Cache{
data: make(map[interface{}]interface{}),
}
}
func (c *Cache) Get(key interface{}) (value interface{}, ok bool) {
if c.data != nil {
v, ok := c.data[key]
return v, ok
}
return nil, false
}
func (c *Cache) Set(key, value interface{}) bool {
if c.data != nil {
c.data[key] = value
return true
}
return false
}
func (c *Cache) Dump() {
if c.data != nil {
for k, v := range c.data {
fmt.Printf("%v:%v\n", k, v)
}
}
}
func main() {
cache := NewCache()
cache.Set("Hello", "World")
cache.Set(1, 2)
cache.Set(1.12, 2.4)
cache.Set(true, false)
cache.Dump()
}
|
初步运行一下,发现似乎没什么问题。让我们修改一下 main()
,增加多个 goroutine 并发读写:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
func worker(c *Cache, k, v interface{}) {
c.Set(k, v)
storedValue, ok := c.Get(k)
if !ok {
fmt.Printf("store %v:%v error\n", k, v)
}
if storedValue != v {
fmt.Printf("store %v:%v error, want: %v, got: %v\n", k, v, v, storedValue)
}
}
func main() {
cache := NewCache()
for i := 0; i < 10; i++ {
k := "key-" + strconv.Itoa(i)
v := i
go worker(cache, k, v)
}
time.Sleep(1 * time.Second)
cache.Dump()
}
|
再次运行一下,发生如下错误:
1
2
3
4
5
6
7
8
9
|
fatal error: concurrent map writes
fatal error: concurrent map writes
goroutine 8 [running]:
runtime.throw(0x10b684c, 0x15)
/usr/local/Cellar/go@1.8/1.8.5/libexec/src/runtime/panic.go:596 +0x95 fp=0xc420027638 sp=0xc420027618
runtime.mapassign(0x109d580, 0xc420018120, 0xc420027738, 0x0)
/usr/local/Cellar/go@1.8/1.8.5/libexec/src/runtime/hashmap.go:499 +0x667 fp=0xc4200276d8 sp=0xc420027638
...
|
查阅参考资料 1 发现,Go 原生的 map 并不支持并发读写,所以会发生上述错误。让我们再做进一步改造。
用 Mutex 改造成安全并发版本
Mutex 的思路:进入临界区时上锁,让其他操作者无法进入;离开临界区开锁,让其他操作者有机会进入。这样,临界区无论在任何时候都只有一个操作者。
Go 为这类上锁开锁动作抽象了一个 interface:
1
2
3
4
|
type Locker interface {
Lock()
UnLock()
}
|
其中 Mutex 和 RWMutex 均实现了 Locker 接口。
Mutex 的定义:
1
2
3
4
5
6
7
8
9
|
type Mutex struct {
// 非暴露字段
}
// 上锁,如果锁已经被其他人占用,将在此阻塞住直至锁可用
func (m *Mutex) Lock()
// 开锁,锁不与任何 goroutine 关联,即任何 goroutine 都可以开锁
func (m *Mutex) Unlock()
|
我们在 Cache 的数据结构中加入 Mutex,并在 Get 和 Set 操作中分别上锁和开锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
type Cache struct {
m sync.Mutex
data map[interface{}]interface{}
}
func (c *Cache) Get(key interface{}) (value interface{}, ok bool) {
c.m.Lock()
defer c.m.Unlock()
...
}
func (c *Cache) Set(key, value interface{}) bool {
c.m.Lock()
defer c.m.Unlock()
...
}
|
再次运行程序,一切正常。
用 RWMutex 替换 Mutex
在大多数场景下,操作者都可分为两种:读者和写者。在读密集的应用中,如果为了读操作而去抢夺一把锁,将会造成性能的大幅下降(因为大部分 goroutine 都在等待锁)。为了缓解这一情况,我们引入读写锁的概念:
区别于 Mutex 的结构,RWMutex 多了 RLock 和 RUnlock,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
type RWMutex struct {
// 未暴露字段
}
// 写者上锁
func (rw *RWMutex) Lock()
// 写者开锁
func (rw *RWMutex) Unlock()
// 读者上锁,如果此时有等待的写锁,读锁不可用
func (rw *RWMutex) RLock()
// 读者开锁
func (rw *RWMutex) RUnlock()
|
对于 Cache,只要稍加改动,在 Get()
(读者)的时候使用读锁,在 Set()
(写者)的时候使用写锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
type Cache struct {
rw sync.RWMutex
data map[interface{}]interface{}
}
func (c *Cache) Get(key interface{}) (value interface{}, ok bool) {
c.rw.RLock()
defer c.rw.RUnlock()
...
}
func (c *Cache) Set(key, value interface{}) bool {
c.rw.Lock()
defer c.rw.Unlock()
...
}
|
下面再用一个实例来简单介绍一下 RWMutex 的几条规则:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
var rw sync.RWMutex
func reader(readerID int) {
fmt.Printf("[reader-%d] try to get read lock\n", readerID)
rw.RLock()
fmt.Printf("[reader-%d] get read lock and sleep\n", readerID)
time.Sleep(1 * time.Second)
fmt.Printf("[reader-%d] release read lock\n", readerID)
rw.RUnlock()
}
func writer(writerID int) {
fmt.Printf("[writer-%d] try to get write lock\n", writerID)
rw.Lock()
fmt.Printf("[writer-%d] get write lock and sleep\n", writerID)
time.Sleep(3 * time.Second)
fmt.Printf("[writer-%d] release write lock\n", writerID)
rw.Unlock()
}
func main() {
// 启动多个 goroutine 获取 read lock 后 sleep 一段时间
// 由于此时没有写者,所以两个 reader 都可以同时获取到读锁
go reader(1)
go reader(2)
time.Sleep(500 * time.Millisecond)
// 写者获取写锁,由于读锁未被释放,所以一开始写者获取不到写锁
go writer(1)
time.Sleep(1 * time.Second)
// 由于写锁还未释放,新的读者获取不到读锁
go reader(3)
go reader(4)
// 主 goroutine 等待足够长时间让所有 goroutine 执行完
time.Sleep(10 * time.Second)
}
|
执行后输出为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
[reader-2] try to get read lock
[reader-1] try to get read lock
[reader-2] get read lock and sleep
[reader-1] get read lock and sleep
[writer-1] try to get write lock --> 尝试获取写锁失败,因为读锁未释放
[reader-2] release read lock --> 读锁释放
[reader-1] release read lock
[writer-1] get write lock and sleep --> 读锁释放后,获取写锁成功
[reader-4] try to get read lock --> 获取读锁失败因为写锁未释放
[reader-3] try to get read lock
[writer-1] release write lock --> 写锁释放
[reader-3] get read lock and sleep --> 写锁释放后,获取读锁成功
[reader-4] get read lock and sleep
[reader-4] release read lock
[reader-3] release read lock
|
使用 Map
正如上文说是,Go 原生的 map 数据类型是非并发安全的,当进行并发读写时,其行为是未定义的,因此,sync 包提供了 Map。
定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
type Map struct {
// 非暴露字段
}
// 删除指定 key
func (m *Map) Delete(key interface{})
// 获取指定 key 的 value,即 Get 操作
func (m *Map) Load(key interface{}) (value interface{}, ok bool)
// 如果存在指定 key 则返回对应的 value,且 loaded 为 true
// 如果不存在对应的 key 则存储传入的 {key, value},且 loaded 为 false
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
// 设置指定 {key, value},即 Set 操作
func (m *Map) Store(key, value interface{})
// 对 Map 上每个 {key, value} 执行函数 f,如果返回 false,结束迭代
func (m *Map) Range(f func(key, value interface{}) bool)
|
用法实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
var wg sync.WaitGroup
func main() {
var m = sync.Map{}
// 测试并发读写是否正常
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := "key-" + strconv.Itoa(i)
value := "value-" + strconv.Itoa(i)
m.Store(key, value)
v, ok := m.Load(key)
if !ok {
fmt.Printf("get %v failed\n", key)
}
if v != value {
fmt.Printf("want: %v, got: %v", value, v)
}
}(i)
}
wg.Wait()
// 进行 range 迭代的顺序于元素添加顺序无关
m.Range(doSomething)
// 删除 key-7
m.Delete("key-7")
// 此时应该无法读取到 key-7
_, ok := m.Load("key-7")
if ok != false {
fmt.Println("delete failed")
}
}
func doSomething(k, v interface{}) bool {
// 遇到指定 key 的时候返回 false 此时迭代结束
if k != "key-7" {
fmt.Printf("key: %v, value: %v\n", k, v)
return true
}
return false
}
|
Cond 的使用
什么是条件变量
条件变量做的事情很简单:让多个 goroutine 等待在某个条件上,如果条件不满足,进入等待状态;如果条件满足,继续运行。
Cond 内部维护着一个 notifyList
,当条件不满足的时候,则将对应的 goroutine 添加到列表上然后进入等待状态。当条件满足时,一般会有其他执行者显式使用 Signal()
或者 Broadcast()
去唤醒 notifyList
上 goroutine。
当进行条件的判断时,必须使用互斥锁来保证条件的安全,即在判断的时候条件没有被其他人修改。所以 Cond 一般会与一个符合 Lock 接口的 Mutex 一起使用。
Cond 的使用
定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
type Cond struct {
// 读写条件状态需要加锁
L Locker
// 剩下的是非暴露字段
}
func NewCond(l Locker) *Cond
// 广播所以等待的 goroutine,条件已经满足
func (c *Cond) Broadcast()
// 单播其中一个等待的 goroutine,条件已经满足
func (c *Cond) Signal()
// 如果条件不满足,调用 Wait() 进入等待状态
func (c *Cond) Wait()
|
此处要特别小心 Wait()
的使用。正如前文所说,条件的判断需要使用互斥锁来确保条件读取前后是一致的,即:
1
2
3
4
5
6
7
|
c.L.Lock() // 进行条件判断,加锁
if !condition() { // 如果不满足条件,进入 if 中
c.Wait() // Wait() 内部会自动解锁
}
... 这里可能会对 condition 作出改变 ...
c.L.Unlock()
|
上述代码其实还有一个很严重的问题,为了说明这个问题,让我们来看看 Wait()
的实现:
1
2
3
4
5
6
7
|
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify) // 加入 notifyList
c.L.Unlock() // 解锁
runtime_notifyListWait(&c.notify, t) // 进入等待模式
c.L.Lock() // 运行到此处说明条件已经满足,开始获取互斥锁,如果锁已经被别人用了,开始等待
}
|
从上面的例子可以看出,当 Wait()
返回时(即已经获取到了互斥锁),有可能条件已经被其他先获取互斥锁的 goroutine 改变了,所以此时必须再次判断一下条件,即:
1
2
3
4
5
6
7
8
9
10
|
c.L.Lock() // 进行条件判断,加锁
if !condition() { // 如果不满足条件,进入 if 中
c.Wait() // Wait() 内部会自动解锁
}
if !condition() { // 如果不满足条件,进入 if 中
c.Wait() // Wait() 内部会自动解锁
}
... 这里可能会对 condition 作出改变 ...
c.L.Unlock()
|
如果代码这么写,就太费劲了,上面代码可以简化为:
1
2
3
4
5
6
7
|
c.L.Lock() // 进行条件判断,加锁
for !condition() { // 如果不满足条件,进入 if 中
c.Wait() // Wait() 内部会自动解锁
}
... 这里可能会对 condition 作出改变 ...
c.L.Unlock()
|
即将 if
替换为 for
,从而当从 Wait()
返回时,再次判断条件是否满足。
用法实例
用一个简单的例子来介绍一下 Cond 如何使用,即:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
var (
wakeup = false
workerNum = 3
)
func worker(workerID int, c *sync.Cond) {
fmt.Printf("Worker [%d] is RUNNING\n", workerID)
c.L.Lock()
for !wakeup {
fmt.Printf("Worker [%d] check conditon\n", workerID)
c.Wait()
}
fmt.Printf("Worker [%d] wakeup, DO something\n", workerID)
// 将唤醒标志改为 false
// 此时其他已经醒来并抢夺互斥锁的 goroutine 重新判断条件后
// 将再次进入 wait 状态
wakeup = false
c.L.Unlock()
}
func main() {
cond := sync.NewCond(&sync.Mutex{})
for i := 0; i < workerNum; i++ {
go worker(i, cond)
}
time.Sleep(2 * time.Second)
wakeup = true
cond.Broadcast() // 向所有 goroutine 进行广播,条件已经满足,即 wakeup = true
time.Sleep(2 * time.Second)
}
|
执行后的输出为:
1
2
3
4
5
6
7
8
9
|
Worker [0] is RUNNING
Worker [1] is RUNNING
Worker [0] check conditon
Worker [1] check conditon
Worker [2] is RUNNING
Worker [2] check conditon
Worker [0] wakeup, DO something
Worker [1] check conditon
Worker [2] check conditon
|
当 worker0 醒来后,又重新把条件变量进行了修改,从而导致 worker1 和 worker2 获取到互斥锁后重新检查到条件不满足,再次进入 wait 状态。
参考
- Go maps in action
- Package sync