《Go语言学习笔记》读书笔记(4)并发

6 分钟阅读

并发

  • 并发: 逻辑上具备同时处理多个任务的能力。
  • 并行: 物理上在同一时刻执行多个并发任务。 多线程或多进程时并行的基本条件,但单线程也可用协程做到并发。尽管协程在单个线程上通过主动切换来实现多任务并发,它也有自己的优势。除了将因阻塞而浪费的时间找回来以外,还免去了线程切换的开销。协程上运行的多个任务本质上是依旧串行的,加上可控自主,所以并不需要做同步处理。 通常情况下,用多进程来实现分布式和负载平衡,减轻单进程垃圾回收压力;用多线程抢夺更多的处理器资源。用协程来提高处理器时间片利用率。

简单将goroutine归纳为协程并不合适。运行时创建多个线程来执行并发任务,且任务单元可被调度到其他线程并行执行。这更像是多线程和协程的综合体,能最大限度提升执行效率,发挥多核处理能力。 只须在函数调用前添加go关键字即可创建并发任务。

go println("hello, world!")

go func(s string) {
  println(s)
} ("hello, world!")

关键字go并非执行并发操作,而是创建一个并发任务单元。新建任务被放置在系统队列中,等待调度器安排合适系统线程去获取执行权。当前流程不会阻塞,不会等待该任务启动,且运行时也不保证并发任务的执行次序。

每个任务单元除保存函数指针、调用参数外,还会分配执行所需的栈内存空间。相比系统默认MB级别的线程栈,goroutinue自定义栈初始仅须2 KB,所以才能创建成千上万的并发任务。自定义栈采取按需分配策略,在需要时进行扩容,最大能到GB规模。 与defer一样,gorountine也会因”延迟执行”而立即计算并复制执行参数。

package main

import (
	"time"
)

var c int

func counter() int {
	c++
	return c
}

func main() {
	a := 100
	go func(x, y int) {
		time.Sleep(time.Second)
		println("go:", x, y)
	}(a, counter())
	a += 100
	println("main:", a, counter())

	time.Sleep(time.Second * 3)	// 等待 `goroutine` 结束
}
/// 输出:
/// main: 200 2
/// go: 100 1

Wait

进程退出时不会等待并发任务结束,可用管道(channel)阻塞,然后发出退出信号。

func main() {
	exit := make(chan struct{})	///< 创建通道。因为仅是通知,数据并没有实际意义

	go func() {
		time.Sleep(time.Second)
		println("goroutine done.")
		close(exit)	///< 关闭通道
	} ()
	println("main...")
	<-exit	///< 如通道关闭,立即解除阻塞
	println("main exit.")
}
/// 输出:
/// main...
/// goroutine done.
/// main exit.

除关闭通道外,写入数据也可解除阻塞。 如要等待多个任务结束,推荐使用sync.WaitGroup。通过设定计数器,让每个goroutine在退出前递减,直至归零时接触阻塞。

func main() {
	var wg sync.WaitGroup

	for i := 0; i < 10; i++ {
		wg.Add(1)	///< 累加计数
		go func(id int) {
			defer wg.Done()	///< 递减计数

			time.Sleep(time.Second)
			println("goroutine", id, "done.")
		} (i)
	}

	println("main...")
	wg.Wait()	///< 阻塞,直到计数为零
	println("main exit.")
}
/// 输出:
/// main...
/// goroutine 2 done.
/// goroutine 3 done.
/// goroutine 7 done.
/// goroutine 9 done.
/// goroutine 6 done.
/// goroutine 0 done.
/// goroutine 8 done.
/// goroutine 4 done.
/// goroutine 1 done.
/// goroutine 5 done.
/// main exit.

尽管WaitGroup.Add实现了原子操作,但建议在goroutine外累加计数器,以免Add尚未执行,Wait已经退出。 可在多处使用Wait阻塞,他们都能接收到通知

func main() {
	var wg sync.WaitGroup
	wg.Add(1)

	go func() {
		wg.Wait()	///< 等待归零,解除阻塞
		println("wait exit.")
	} ()

	go func() {
		time.Sleep(time.Second)
		println("done.")
		wg.Done()	///< 递减计数
	}()
	wg.Wait()	///< 等待归零
	println("main exit.")

}
/// 输出:
/// done.
/// wait exit.
/// main exit.

GOMAXPROCE

运行时可能会创建很多线程,但任何时候仅有限的几个线程参与并发任务执行。该数量默认与处理器核数相等,可用runtime.GOMAXPROCS函数(或环境变量)修改。

Local Storage

与线程不同,goroutine任务无法设置优先级,无法获取编号,没有局部存储(TLS), 甚至连返回值都会被抛弃。但除优先级外,其他功能都很容易实现。

func main() {
	var wg sync.WaitGroup
	var gs [5]struct {	///< 用于实现类似TLS功能
		id int	///< 编号
		result int	///< 返回值
	}

	for i := 0; i < len(gs); i++ {
		wg.Add(1)

		go func (id int) {	// 使用参数避免闭包延迟求值
			defer wg.Done()

			gs[id].id = id
			gs[id].result = (id + 1) * 100
		}	(i)
	}

	wg.Wait()
	fmt.Printf("%+v\n", gs)
}
/// 输出:
/// [{id:0 result:100} {id:1 result:200} {id:2 result:300} {id:3 result:400} {id:4 result:500}]

如使用map作为局部存储容器,建议做同步处理,因为运行时会对其做并发读写检查。

Gosched

暂停,释放线程去执行其他任务。当前任务被放回队列,等待下次调度时恢复执行。

func main() {
	runtime.GOMAXPROCS(1)
	exit := make(chan struct{})

	go func() {
		defer close(exit)
		go func() {
			println("b")
		}()

		for i := 0; i < 4; i++ {
			println("a:", i)

			if (i == 1) {	/// 让出当前线程,调度执行b
				runtime.Gosched()
			}
		}
	}()
	<-exit
}
/// 输出:
/// a: 0
/// a: 1
/// b
/// a: 2
/// a: 3

Goexit

Goexit立即终止当前任务,运行时确保所有已注册延迟调用被执行。该函数不会影响其他并发任务,不会引发panic, 自然也就无法捕获。

func main() {
	exit := make(chan struct{})

	go func () {
		defer close(exit)	///< 执行
		defer println("a")	///< 执行

		func () {
			defer func() {
				println("b", recover() == nil)	///< 执行,recover返回nil
			}()
		func() {	///< 在多层调用中执行Goexit
			println("c")
			runtime.Goexit()	///< 立即终止整个调用堆栈
			println("c done.")
		} ()
		println("b done.")
		}()
		println("a done.")	///< 不会执行
	}()
	<-exit
	println("main exit.")
}
/// 输出:
/// c
/// b true
/// a
/// main exit.

如果在main.main里调用Goexit, 它会等待其他任务结束,然后让进程直接崩溃。 无论身处哪一层,Goexit都能立即终止整个调用堆栈,这与return仅退出当前函数不同。标准库函数os.Exit可终止进程,但不会执行延迟调用。

通道

Go语言并未实现严格的并发安全。 允许全局变量、指针、引用类型这些非安全内存共享操作,就需要开发人员自行维护数据一致和完整性。Go鼓励使用CSP通道,以通信来代替内存共享,实现并发安全。

CSP: Communicating Sequential Process

通过消息来避免竞态的模型除了CSP, 还有Actor。但两者由较大区别 作为CSP核心,通道(channel)是显式的,要求操作双方必须知道数据类型和具体通道,并不关心另一端操作者身份和数量。可如果另一端未准备妥当,或消息未能及时处理时,会阻塞当前端。 相比起来,Actor是透明的,它不在乎数据类型及通道,只要知道接收者信箱即可。默认就是异步方式,发送方对消息是否被接收和处理并不关心。

从底层实现上来说,通道知识一个队列。同步模式下,发送和接受双方配对,然后直接赋值数据给对方。如配对失败,则置入等待队列,直到另一方出现后才被唤醒。异步模式抢夺的则是数据缓冲槽。发送方要求有空槽可供写入,而接收方则要求有缓冲数据可读。需求不符时,同样加入缓冲队列,直到有另一方写入数据或腾出空槽后被唤醒。 除传递消息(数据)外,通道还常被用做事件通知。

func main() {
	done := make(chan struct{})
	c := make(chan string)

	go func(){
		s := <-chan	///< 接收消息
		println(s)
		close(done)	///< 关闭通道,作为结束通知
	} ()

	c <- "hi!" 	///< 发送消息
	<-done			///< 阻塞,直到有数据或管道关闭。
}

同步模式必须有配对操作的goroutine出现,否则会一直阻塞。而异步模式在缓冲区未满或数据未读完前,不会阻塞。

func main() {
  c := make(chan int, 3)  ///< 创建带3个缓冲槽的异步通道。
  c <- 1  ///< 缓冲区未满,不会阻塞
  c <- 2
  println(<-c)  ///< 缓冲区不会阻塞
  println(<-c)  ///< 缓冲区不会阻塞
}
/// 输出:
/// 1
/// 2

多数时候,异步通道有助于提升性能,减少排队阻塞。 缓冲去大小仅是内部属性,不属于类型组成部分。另外通道变量本身就是指针,可用相等操作符判断是否为同一对象或nil。 内置函数caplen返回缓冲区大小和当前已缓冲数量;而对于同步通道则都返回0;据此可判断通道时同步还是异步

func main () {
  var a, b := make(chan int), make(chan int, 3)

  b <- 1
  b <- 2

  println("a:", len(a), cap(a))
  println("b:", len(b), cap(b))
}
/// 输出:
/// a: 0 0
/// b: 2 3

收发

除使用简单的发送和接受操作符外,还可用ok-idomrange模式处理数据

func main() {
	done := make(chan struct{})
	c := make(chan int)

	go func() {
		defer close(done)		///< 确保发出结束通知

		for {
			x, ok := <-c
			if !ok {	///< 据此判断通道是否被关闭
				return
			}

			println(x)
		}
	} ()

	c <- 1
	c <- 2
	c <- 3
	close(c)	///< 及时使用`close`函数关闭通道引发结束通知,否则可能会导致死锁。
	<-done
}
/// 输出:
/// 1
/// 2
/// 3

一次性事件用close效率更好,没有多余开销。连续或多样性事件,可传递不同数据标志实现。还可使用sync.Cond实现单播或广播事件。

对于closednil通道,发送和接收操作都有相应规则:

  • 向已关闭通道发送数据,引发panci
  • 从已关闭接收数据,返回已缓冲数据或零值。
  • 无论收发,nil通道都会阻塞。
func main() {
  c := make(chan int, 3)

  c <- 10
  c <- 20
  close(c)

  for i := 0; i < cap(c)+1; i++ {
    x, ok := <-c
    println(i, ":", ok, x)
  }
}

重复关闭或关闭nil通道都会引发panic错误。

单向

通道默认时双向的,并不区分发送和接收端。 尽管可用make创建单向通道,但那没有任何意义。通常使用类型装欢来获取单向通道,并分别赋予操作双方。

func main() {
  var wg sync.WaitGroup
  wg.Add(2)

  c := make(chan int)
  var send chan<- int = c
  var recv <-chan int = c

  go func() {
    defer wg.Done()
    for x := range recv {
      println(x)
    }
  }()

  go func(){
    defer wg.Done()
    defer close(c)

    for i := 0; i < 3; i++ {
      send <- i
    }
  }()

  wg.Wait()
}

不能在单向通道上做逆向操作。

func main(){
  c := make(chan int, 2)

  var send chan<- int = c
  var recv <-chan int = c

  <-send  ///< 无效操作
  recv <- 1 ///< 无效操作
}

同样,close不能用于接收端

func main() {
  c := make(chan int, 2)
  var recv <- chan int = c
  close(recv) ///< 无效操作
}

无法将单向通道重新转换回去。

func main() {
  var a, b chan int
  a = make(chan int, 2)
  var recv <-chan int = a
  var send chan<- int = a

  b = (chan int)(recv)  /// 错误
  b = (chan int)(send)  /// 错误
}

选择

如要同时处理多个通道,可选用select语句。它会随机选择一个可用通道做收发操作。

func main() {
	var wg sync.WaitGroup
	wg.Add(2)

	a, b := make(chan int), make(chan int)

	go func() {	///< 接收端
		defer wg.Done()

		for {
			var (
				name string
				x	int
				ok bool
			)

			select {	///< 随机选择可用 channel 接收数据
			case x, ok = <-a:
				name = "a"
			case x, ok = <-b:
				name = "b"
			}

			if !ok {	///< 如果任一通道关闭,则终止接收
				return
			}

			println(name, x)	///< 输出接收的数据信息
		}
	}()

	go func() {	///< 发送端
		defer wg.Done()
		defer close(a)
		defer close(b)
		for i := 0; i < 10; i++ {
			select {
			case a <- i:
			case b <- i*10:
			}
		}
	}()

	wg.Wait()
}

/// 输出:
/// a 0
/// b 10
/// a 2
/// a 3
/// a 4
/// a 5
/// b 60
/// a 7
/// a 8
/// a 9

如要等全部通道消息处理结束(closed),可将已完成通道设置为nil。这样它就会被阻塞,不再被select选中。 即使是同一通道,也会随机选择case执行。

当所有通道都不可用时,select会执行default语句。如此可避开select阻塞,但须注意处理外层循环,以免陷入空耗。

模式

通常使用工厂方法将goroutine和通道绑定。

type receiver struct {
  sync.WaitGroup
  data chan int
}

func newReceiver() * receiver {
  r := &receiver{
    data: make(chan int),
  }
  r.Add(1)
  go func(){
    defer r.Done()
    for x := range r.data{
      println("recv:", x)
    }
  }
}

func main() {
  r := newReceiver()
  r.data <- 1
  r.data <- 2

  close(r.data) ///< 关闭通道,发出结束通知
  r.Wait()  ///< 等待接收者处理结束
}

鉴于通道本身就是一个并发安全的队列,可用作ID generatorPool等用途。

type pool chan []byte
func new Pool(cap int) pool {
  return make(chan []byte, cap)
}

func (p pool) get() []byte {
  var v []byte

  select {
    case v = <-p: ///< 获取
    default:  ///< 获取失败,则创建
      v = make([]byte, 10)
  }
  return v
}

func (p pool) put(b []byte) {
  select {
    case p <- b:
    default:  ///< 放回失败,放弃
  }
}

用通道实现信号量(semaphore)

func main() {
  runtime.GOMAXPROCS(4)
  var wg sync.WaitGroup
  sem := make(chan struct{}, 2) ///< 最多允许两个并发同时执行
  for i := 0; i < 5; i++ {
    wg.Add(1)
    go func (id int) {
      defer wg.Done()
      sem <- struct{}{} ///< acquire: 获取信号
      defer func() { <-sem}() ///< release: 释放信号
      time.Sleep(time.Second * 2)
      fmt.Println(id, time.Now())
    }(i)
  }
  wg.Wait()
}

标准库time提供了timeouttick channel实现。

func main() {
  go func() {
    for {
      select {
        case <-time.After(time.Second*5):
          fmt.Println("timeout ...")
          os.Exit(0)
      }
    }
  }()

  go func() {
    tick := time.Tick(time.Second)

    for {
      select {
        case <-tick:
          fmt.Println(time.Now())
      }
    }
  }()
  <-(chan struct{})(nil)  // 直接用nil channel阻塞进程
}

捕获INTTERM信号,顺便实现一个简易的atexit函数。

import (
  "os"
  "os/signal"
  "sync"
  "syscall"
)

var exits = &struct {
  sync.RWMutex
  func []func()
  signals chan os.Signal
}{}

func atexit(f func()){
  exits.Lock()
  defer exits.Unlock()
  exits.funcs = append(exits.funcs, f)
}

func waitExit() {
  if exits.signals == nil {
    exits.signals = make(chan os.Signal)
    signal.Notify(exits.signals, syscall.SIGINT, syscall.SIGTERM)
  }
  exits.RLock()
  for _, f := range exits.funcs {
    defer f() ///< 即使某些函数panic,延迟调用也能确保后续函数执行
  }
  exits.RUnlock()
  <-exit.signals
}

func main() {
  atexit(func() {println("exit1 ...")})
  atexit(func() {println("exit2 ...")})
  waitExit()
}

资源泄露

通道可能会引发goroutine leak, 确切的说,是指goroutine处于发送或接收阻塞状态,但一直未被唤醒。垃圾回收器并不收集此类资源,导致他们会在等待队列里长久休眠形成资源泄露。

同步

标准库sync提供了互斥和读写锁,另有原子操作等,可基本满足日常开发需要。MutexRWMutex的使用并不复杂,只有几个地方需要注意。 将Mutex作为匿名字段时,相关方法必须实现为pointer-receiver, 否则会因赋值导致锁机制失效。

type data struct {
	sync.Mutex
}

func (d data) test(s string) {
	d.Lock()
	defer d.Unlock()

	for i := 0; i < 5; i++ {
		println(s, i)
		time.Sleep(time.Second)
	}
}

func main() {
	var wg sync.WaitGroup
	wg.Add(2)

	var d data
	go func() {
		defer wg.Done()
		d.test("read")
	}	()

	go func() {
		defer wg.Done()
		d.test("write")
	}()
}
  • 锁不支持递归锁。
  • 对性能要求较高时,应避免使用defer Unlock
  • 读写并发时,用RWMutex性能会更好一些
  • 对单个数据读写保护,可尝试用原子操作
  • 执行严格的测试,尽可能打开数据竞争检查