并发
- 并发: 逻辑上具备同时处理多个任务的能力。
- 并行: 物理上在同一时刻执行多个并发任务。
多线程或多进程时并行的基本条件,但单线程也可用协程做到并发。尽管协程在单个线程上通过主动切换来实现多任务并发,它也有自己的优势。除了将因阻塞而浪费的时间找回来以外,还免去了线程切换的开销。协程上运行的多个任务本质上是依旧串行的,加上可控自主,所以并不需要做同步处理。
通常情况下,用多进程来实现分布式和负载平衡,减轻单进程垃圾回收压力;用多线程抢夺更多的处理器资源。用协程来提高处理器时间片利用率。
简单将goroutine
归纳为协程并不合适。运行时创建多个线程来执行并发任务,且任务单元可被调度到其他线程并行执行。这更像是多线程和协程的综合体,能最大限度提升执行效率,发挥多核处理能力。
只须在函数调用前添加go
关键字即可创建并发任务。
1 2 3 4 5
| go println("hello, world!")
go func(s string) { println(s) } ("hello, world!")
|
关键字go
并非执行并发操作,而是创建一个并发任务单元。新建任务被放置在系统队列中,等待调度器安排合适系统线程去获取执行权。当前流程不会阻塞,不会等待该任务启动,且运行时也不保证并发任务的执行次序。
每个任务单元除保存函数指针、调用参数外,还会分配执行所需的栈内存空间。相比系统默认MB
级别的线程栈,goroutinue
自定义栈初始仅须2 KB
,所以才能创建成千上万的并发任务。自定义栈采取按需分配策略,在需要时进行扩容,最大能到GB
规模。
与defer
一样,gorountine
也会因”延迟执行”而立即计算并复制执行参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package main
import ( "time" )
var c int
func counter() int { c++ return c }
func main() { a := 100 go func(x, y int) { time.Sleep(time.Second) println("go:", x, y) }(a, counter()) a += 100 println("main:", a, counter())
time.Sleep(time.Second * 3) }
|
Wait
进程退出时不会等待并发任务结束,可用管道(channel
)阻塞,然后发出退出信号。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func main() { exit := make(chan struct{})
go func() { time.Sleep(time.Second) println("goroutine done.") close(exit) } () println("main...") <-exit println("main exit.") }
|
除关闭通道外,写入数据也可解除阻塞。
如要等待多个任务结束,推荐使用sync.WaitGroup
。通过设定计数器,让每个goroutine
在退出前递减,直至归零时接触阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func main() { var wg sync.WaitGroup
for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done()
time.Sleep(time.Second) println("goroutine", id, "done.") } (i) }
println("main...") wg.Wait() println("main exit.") }
|
尽管WaitGroup.Add
实现了原子操作,但建议在goroutine
外累加计数器,以免Add
尚未执行,Wait
已经退出。
可在多处使用Wait
阻塞,他们都能接收到通知
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func main() { var wg sync.WaitGroup wg.Add(1)
go func() { wg.Wait() println("wait exit.") } ()
go func() { time.Sleep(time.Second) println("done.") wg.Done() }() wg.Wait() println("main exit.")
}
|
GOMAXPROCE
运行时可能会创建很多线程,但任何时候仅有限的几个线程参与并发任务执行。该数量默认与处理器核数相等,可用runtime.GOMAXPROCS
函数(或环境变量)修改。
Local Storage
与线程不同,goroutine
任务无法设置优先级,无法获取编号,没有局部存储(TLS), 甚至连返回值都会被抛弃。但除优先级外,其他功能都很容易实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func main() { var wg sync.WaitGroup var gs [5]struct { id int result int }
for i := 0; i < len(gs); i++ { wg.Add(1)
go func (id int) { defer wg.Done()
gs[id].id = id gs[id].result = (id + 1) * 100 } (i) }
wg.Wait() fmt.Printf("%+v\n", gs) }
|
如使用map
作为局部存储容器,建议做同步处理,因为运行时会对其做并发读写检查。
Gosched
暂停,释放线程去执行其他任务。当前任务被放回队列,等待下次调度时恢复执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func main() { runtime.GOMAXPROCS(1) exit := make(chan struct{})
go func() { defer close(exit) go func() { println("b") }()
for i := 0; i < 4; i++ { println("a:", i)
if (i == 1) { runtime.Gosched() } } }() <-exit }
|
Goexit
Goexit
立即终止当前任务,运行时确保所有已注册延迟调用被执行。该函数不会影响其他并发任务,不会引发panic
, 自然也就无法捕获。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func main() { exit := make(chan struct{})
go func () { defer close(exit) defer println("a")
func () { defer func() { println("b", recover() == nil) }() func() { println("c") runtime.Goexit() println("c done.") } () println("b done.") }() println("a done.") }() <-exit println("main exit.") }
|
如果在main.main
里调用Goexit
, 它会等待其他任务结束,然后让进程直接崩溃。
无论身处哪一层,Goexit
都能立即终止整个调用堆栈,这与return
仅退出当前函数不同。标准库函数os.Exit
可终止进程,但不会执行延迟调用。
通道
Go
语言并未实现严格的并发安全。
允许全局变量、指针、引用类型这些非安全内存共享操作,就需要开发人员自行维护数据一致和完整性。Go
鼓励使用CSP
通道,以通信来代替内存共享,实现并发安全。
CSP: Communicating Sequential Process
通过消息来避免竞态的模型除了CSP
, 还有Actor
。但两者由较大区别
作为CSP
核心,通道(channel)是显式的,要求操作双方必须知道数据类型和具体通道,并不关心另一端操作者身份和数量。可如果另一端未准备妥当,或消息未能及时处理时,会阻塞当前端。
相比起来,Actor
是透明的,它不在乎数据类型及通道,只要知道接收者信箱即可。默认就是异步方式,发送方对消息是否被接收和处理并不关心。
从底层实现上来说,通道知识一个队列。同步模式下,发送和接受双方配对,然后直接赋值数据给对方。如配对失败,则置入等待队列,直到另一方出现后才被唤醒。异步模式抢夺的则是数据缓冲槽。发送方要求有空槽可供写入,而接收方则要求有缓冲数据可读。需求不符时,同样加入缓冲队列,直到有另一方写入数据或腾出空槽后被唤醒。
除传递消息(数据)外,通道还常被用做事件通知。
1 2 3 4 5 6 7 8 9 10 11 12 13
| func main() { done := make(chan struct{}) c := make(chan string)
go func(){ s := <-chan println(s) close(done) } ()
c <- "hi!" <-done }
|
同步模式必须有配对操作的goroutine
出现,否则会一直阻塞。而异步模式在缓冲区未满或数据未读完前,不会阻塞。
1 2 3 4 5 6 7 8 9 10
| func main() { c := make(chan int, 3) c <- 1 c <- 2 println(<-c) println(<-c) }
|
多数时候,异步通道有助于提升性能,减少排队阻塞。
缓冲去大小仅是内部属性,不属于类型组成部分。另外通道变量本身就是指针,可用相等操作符判断是否为同一对象或nil
。
内置函数cap
和len
返回缓冲区大小和当前已缓冲数量;而对于同步通道则都返回0;据此可判断通道时同步还是异步
1 2 3 4 5 6 7 8 9 10 11 12
| func main () { var a, b := make(chan int), make(chan int, 3)
b <- 1 b <- 2
println("a:", len(a), cap(a)) println("b:", len(b), cap(b)) }
|
收发
除使用简单的发送和接受操作符外,还可用ok-idom
或range
模式处理数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| func main() { done := make(chan struct{}) c := make(chan int)
go func() { defer close(done)
for { x, ok := <-c if !ok { return }
println(x) } } ()
c <- 1 c <- 2 c <- 3 close(c) <-done }
|
一次性事件用close
效率更好,没有多余开销。连续或多样性事件,可传递不同数据标志实现。还可使用sync.Cond
实现单播或广播事件。
对于closed
或nil
通道,发送和接收操作都有相应规则:
- 向已关闭通道发送数据,引发
panci
- 从已关闭接收数据,返回已缓冲数据或零值。
- 无论收发,
nil
通道都会阻塞。
1 2 3 4 5 6 7 8 9 10 11 12
| func main() { c := make(chan int, 3)
c <- 10 c <- 20 close(c)
for i := 0; i < cap(c)+1; i++ { x, ok := <-c println(i, ":", ok, x) } }
|
重复关闭或关闭nil
通道都会引发panic
错误。
单向
通道默认时双向的,并不区分发送和接收端。
尽管可用make
创建单向通道,但那没有任何意义。通常使用类型装欢来获取单向通道,并分别赋予操作双方。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func main() { var wg sync.WaitGroup wg.Add(2)
c := make(chan int) var send chan<- int = c var recv <-chan int = c
go func() { defer wg.Done() for x := range recv { println(x) } }()
go func(){ defer wg.Done() defer close(c)
for i := 0; i < 3; i++ { send <- i } }()
wg.Wait() }
|
不能在单向通道上做逆向操作。
1 2 3 4 5 6 7 8 9
| func main(){ c := make(chan int, 2)
var send chan<- int = c var recv <-chan int = c
<-send recv <- 1 }
|
同样,close不能用于接收端
1 2 3 4 5
| func main() { c := make(chan int, 2) var recv <- chan int = c close(recv) }
|
无法将单向通道重新转换回去。
1 2 3 4 5 6 7 8 9
| func main() { var a, b chan int a = make(chan int, 2) var recv <-chan int = a var send chan<- int = a
b = (chan int)(recv) b = (chan int)(send) }
|
选择
如要同时处理多个通道,可选用select
语句。它会随机选择一个可用通道做收发操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| func main() { var wg sync.WaitGroup wg.Add(2)
a, b := make(chan int), make(chan int)
go func() { defer wg.Done()
for { var ( name string x int ok bool )
select { case x, ok = <-a: name = "a" case x, ok = <-b: name = "b" }
if !ok { return }
println(name, x) } }()
go func() { defer wg.Done() defer close(a) defer close(b) for i := 0; i < 10; i++ { select { case a <- i: case b <- i*10: } } }()
wg.Wait() }
|
如要等全部通道消息处理结束(closed),可将已完成通道设置为nil
。这样它就会被阻塞,不再被select
选中。
即使是同一通道,也会随机选择case
执行。
当所有通道都不可用时,select
会执行default
语句。如此可避开select
阻塞,但须注意处理外层循环,以免陷入空耗。
模式
通常使用工厂方法将goroutine
和通道绑定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| type receiver struct { sync.WaitGroup data chan int }
func newReceiver() * receiver { r := &receiver{ data: make(chan int), } r.Add(1) go func(){ defer r.Done() for x := range r.data{ println("recv:", x) } } }
func main() { r := newReceiver() r.data <- 1 r.data <- 2
close(r.data) r.Wait() }
|
鉴于通道本身就是一个并发安全的队列,可用作ID generator
、Pool
等用途。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| type pool chan []byte func new Pool(cap int) pool { return make(chan []byte, cap) }
func (p pool) get() []byte { var v []byte
select { case v = <-p: default: v = make([]byte, 10) } return v }
func (p pool) put(b []byte) { select { case p <- b: default: } }
|
用通道实现信号量(semaphore)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func main() { runtime.GOMAXPROCS(4) var wg sync.WaitGroup sem := make(chan struct{}, 2) for i := 0; i < 5; i++ { wg.Add(1) go func (id int) { defer wg.Done() sem <- struct{}{} defer func() { <-sem}() time.Sleep(time.Second * 2) fmt.Println(id, time.Now()) }(i) } wg.Wait() }
|
标准库time
提供了timeout
和tick channel
实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func main() { go func() { for { select { case <-time.After(time.Second*5): fmt.Println("timeout ...") os.Exit(0) } } }()
go func() { tick := time.Tick(time.Second)
for { select { case <-tick: fmt.Println(time.Now()) } } }() <-(chan struct{})(nil) }
|
捕获INT
、TERM
信号,顺便实现一个简易的atexit
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import ( "os" "os/signal" "sync" "syscall" )
var exits = &struct { sync.RWMutex func []func() signals chan os.Signal }{}
func atexit(f func()){ exits.Lock() defer exits.Unlock() exits.funcs = append(exits.funcs, f) }
func waitExit() { if exits.signals == nil { exits.signals = make(chan os.Signal) signal.Notify(exits.signals, syscall.SIGINT, syscall.SIGTERM) } exits.RLock() for _, f := range exits.funcs { defer f() } exits.RUnlock() <-exit.signals }
func main() { atexit(func() {println("exit1 ...")}) atexit(func() {println("exit2 ...")}) waitExit() }
|
资源泄露
通道可能会引发goroutine leak
, 确切的说,是指goroutine
处于发送或接收阻塞状态,但一直未被唤醒。垃圾回收器并不收集此类资源,导致他们会在等待队列里长久休眠形成资源泄露。
同步
标准库sync
提供了互斥和读写锁,另有原子操作等,可基本满足日常开发需要。Mutex
、RWMutex
的使用并不复杂,只有几个地方需要注意。
将Mutex
作为匿名字段时,相关方法必须实现为pointer-receiver
, 否则会因赋值导致锁机制失效。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| type data struct { sync.Mutex }
func (d data) test(s string) { d.Lock() defer d.Unlock()
for i := 0; i < 5; i++ { println(s, i) time.Sleep(time.Second) } }
func main() { var wg sync.WaitGroup wg.Add(2)
var d data go func() { defer wg.Done() d.test("read") } ()
go func() { defer wg.Done() d.test("write") }() }
|
- 锁不支持递归锁。
- 对性能要求较高时,应避免使用
defer Unlock
- 读写并发时,用
RWMutex
性能会更好一些
- 对单个数据读写保护,可尝试用原子操作
- 执行严格的测试,尽可能打开数据竞争检查