Concurrency in Go
Go提供了两个独特创新的方式来实现并发,他们就是goroutine和channel。
goroutine是Go程序中可执行的最小单元。channel可以使goroutine间的通信变得高效。
在Go中,所有可执行的代码都是通过goroutine,包括程序启动时调用的main。Go被称为并发编程语言由此得名。
并发(Concurrency)和并行(Parallel)
因为都有一个“并”字,所以我们常常混用这两个词。虽然他们都表示同时发生,但其实他们的含义并不完全相同。
并发(Concurrency)是指组件之间可以相互独立运行的一种结构,常常伴随着同步和组件间通信等等。
而并行(Parallel)是指互不干扰,相互平行地运行。
下图出自Erlang之父 Joe Armstrong,可以体会一下。
Goroutine
在Go中,创建和执行goroutine意外地简单,只需要在函数前使用 go
关键词即可。go
后面可接函数名也可接匿名函数。
package main
import (
"flag"
"fmt"
"time"
)
func main() {
numOfRoutines := flag.Int("n", 10, "Number of goroutines")
flag.Parse()
fmt.Println(*numOfRoutines, "routines will be created")
// 创建和运行 n 个 goroutine。
for i := 0; i < *numOfRoutines; i++ {
go func(x int) {
fmt.Print(x, " ")
}(i)
}
time.Sleep(2 * time.Second) // 在了解同步机制之前,先用sleep方式敷衍一下。
fmt.Println("\nEnd")
}
输出结果:
10 routines will be created
0 4 6 9 8 7 2 5 1 3
End
可以看出,goroutine是异步运行且运行顺序是无法保证的。但是有时候我们有需要确定goroutine之间的执行顺序。以下代码将展示如何实现指定顺序执行goroutine。
package main
import (
"fmt"
"time"
)
func introduction() {
fmt.Println("This code shows how to specify the running order of goroutines")
}
func A(a, b chan struct{}) {
<-a
fmt.Println("A()")
time.Sleep(time.Second)
close(b)
}
func B(a, b chan struct{}) {
<-a
fmt.Println("B()")
close(b)
}
func C(a chan struct{}) {
<-a
fmt.Println("C()")
}
func main() {
introduction()
a := make(chan struct{})
b := make(chan struct{})
c := make(chan struct{})
go C(c)
go A(a, b)
go C(c)
go B(b, c)
go C(c)
close(a)
time.Sleep(3 * time.Second)
}
运行结果
This code shows how to specify the running order of goroutines
A()
B()
C()
C()
C()
Sync
使用sync
包来实现对goroutine的等待。
package main
import (
"flag"
"fmt"
"sync" // sync package
)
func main() {
n := flag.Int("n", 20, "Number of goroutines")
flag.Parse()
count := *n
fmt.Printf("Going to create %d goroutines.\n", count)
var waitGoup sync.WaitGroup //创建一个wait group
fmt.Println("Start")
for i := 0; i < count; i++ {
waitGoup.Add(1) // wait count递增
go func(x int) {
defer waitGoup.Done() // wait count递减。defer的作用相当于C#中的finally,将后面接的语句延后到函数退出时执行。
fmt.Print(x, " ")
}(i)
}
waitGoup.Wait()
fmt.Print("\nEnd")
}
.Add() 和 .Done() 的调用次数必须相同,如果add多了,那么会得到
fatal error: all goroutines are asleep - deadlock!
如果done多了,那么会得到
panic: sync: negative WaitGroup counter
Channel
channel是goroutine间交换数据的一种机制。
用 chan
关键词创建channel。用 close
函数来关闭channel。
package main
import (
"fmt"
"sync"
)
func main() {
ch := make(chan int)
var wg sync.WaitGroup
wg.Add(2)
go Print(ch, &wg)
go func() {
for i := 1; i <= 11; i++ {
ch <- i
}
close(ch)
defer wg.Done()
}()
wg.Wait()
_, ok := <-ch
if ok {
fmt.Println("\nChannel is open!")
} else {
fmt.Println("\nChannel is closed!")
}
}
func Print(ch <-chan int, wg *sync.WaitGroup) {
for n := range ch { // reads from channel until it's closed
fmt.Println(n)
}
defer wg.Done()
}
运行结果
1 2 3 4 5 6 7 8 9 10 11
Channel is closed!
如果读取一个close的channel,会得到channel对应数据类型的零值。
package main
import (
"fmt"
)
func main() {
willClose := make(chan int, 10)
willClose <- -1
willClose <- 0
willClose <- 2
<-willClose
<-willClose
<-willClose
close(willClose)
read := <-willClose
fmt.Println(read)
}
运行结果
0
Channel在作为函数参数时,还能指定send-only还是receive-only。默认是双向的。
package main
import "fmt"
func ping(pings chan<- string, msg string) {
pings <- msg
}
func pong(pings <-chan string, pongs chan<- string) {
msg := <-pings
pongs <- msg
}
func main() {
pings := make(chan string, 1)
pongs := make(chan string, 1)
ping(pings, "passed message")
pong(pings, pongs)
fmt.Println(<-pongs)
}
chan<-
是send-only channel。 <-chan
是receive-only channel。
请记住,channel类型的零值是nil
。如果对一个closed的channel发数据,那么程序将panic。但是从closed的channel读取,可以得到channel对应type的零值。因此,channel关闭后,不能往里写,但是可以往外读。
为了能够关闭channel,首先channel不能为receive-only。此外,nil
channel总是block的。channel的这个特性非常适合用来disable select
语句中的某个分支,只要往那个分支的channel 写入nil
即可。
Buffered channel
带缓冲的channel可以让Go 调度器将任务放入队列,从而腾出手来处理其他的请求。另外,带缓冲的channle还能用作semaphore,来限制程序的吞吐。
package main
import (
"fmt"
"sync"
"time"
)
func introduction() {
fmt.Println("This code shows how a buffered channel works")
}
func timeout(wg *sync.WaitGroup, t time.Duration) bool {
ch := make(chan int)
go func() {
defer close(ch)
time.Sleep(5 * time.Second)
wg.Wait()
}()
select {
case <-ch:
return false
case <-time.After(t):
return true
}
}
func main() {
introduction()
nums := make(chan int, 5)
counter := 10
for i := 0; i < counter; i++ {
select {
case nums <- i:
default:
fmt.Println("No enough space for", i)
}
}
for i := 0; i < counter+5; i++ {
select {
case num := <-nums:
fmt.Println(num)
default:
fmt.Println("No more to be done")
break
}
}
}
结果:
This code shows how a buffered channel works
No enough space for 5
No enough space for 6
No enough space for 7
No enough space for 8
No enough space for 9
0
1
2
3
4
No more to be done
No more to be done
No more to be done
No more to be done
No more to be done
No more to be done
No more to be done
No more to be done
No more to be done
No more to be done
Nil channel
Nil channel的特性是always block。
package main
import (
"fmt"
"math/rand"
"time"
)
func introduction() {
fmt.Println("This code shows how a nil channel works")
}
func add(c chan int) {
sum := 0
t := time.NewTimer(2 * time.Second)
for {
select {
case input := <-c:
sum += input
case <-t.C:
c = nil
fmt.Println(sum)
}
}
}
func send(c chan int) {
for {
c <- rand.Intn(10)
}
}
func main() {
introduction()
c := make(chan int)
go add(c)
go send(c)
time.Sleep(3 * time.Second)
}
channel 的 channel
package main
import (
"fmt"
"time"
)
func introduction() {
fmt.Println("This code shows how a channel of channel works")
}
var times int
func f1(cc chan chan int, f chan bool) {
c := make(chan int)
cc <- c
defer close(c)
sum := 0
select {
case x := <-c:
for i := 0; i <= x; i++ {
sum += i
}
c <- sum
case <-f:
return
}
}
func main() {
introduction()
cc := make(chan chan int)
upper_limit := 10
for i := 1; i < upper_limit+1; i++ {
f := make(chan bool)
go f1(cc, f)
ch := <-cc
ch <- i
for sum := range ch {
fmt.Print("Sum(", i, ")=", sum)
}
fmt.Println()
time.Sleep(time.Second)
close(f)
}
}
(end)