Go Concurrency

Posted by William Basics on Thursday, April 29, 2021

Concurrency in Go

Go提供了两个独特创新的方式来实现并发,他们就是goroutine和channel。

goroutine是Go程序中可执行的最小单元。channel可以使goroutine间的通信变得高效。

在Go中,所有可执行的代码都是通过goroutine,包括程序启动时调用的main。Go被称为并发编程语言由此得名。

并发(Concurrency)和并行(Parallel)

因为都有一个“并”字,所以我们常常混用这两个词。虽然他们都表示同时发生,但其实他们的含义并不完全相同。

并发(Concurrency)是指组件之间可以相互独立运行的一种结构,常常伴随着同步和组件间通信等等。

而并行(Parallel)是指互不干扰,相互平行地运行。

下图出自Erlang之父 Joe Armstrong,可以体会一下。

concurrency and parallel

Goroutine

在Go中,创建和执行goroutine意外地简单,只需要在函数前使用 go 关键词即可。go 后面可接函数名也可接匿名函数。

package main

import (
	"flag"
	"fmt"
	"time"
)

func main() {
	numOfRoutines := flag.Int("n", 10, "Number of goroutines")
	flag.Parse()

	fmt.Println(*numOfRoutines, "routines will be created")

    // 创建和运行 n 个 goroutine。
	for i := 0; i < *numOfRoutines; i++ {
		go func(x int) {
			fmt.Print(x, " ")
		}(i)
	}

	time.Sleep(2 * time.Second) // 在了解同步机制之前,先用sleep方式敷衍一下。
	fmt.Println("\nEnd")
}

输出结果:

10 routines will be created
0 4 6 9 8 7 2 5 1 3 
End

可以看出,goroutine是异步运行且运行顺序是无法保证的。但是有时候我们有需要确定goroutine之间的执行顺序。以下代码将展示如何实现指定顺序执行goroutine。

package main

import (
	"fmt"
	"time"
)

func introduction() {
	fmt.Println("This code shows how to specify the running order of goroutines")
}

func A(a, b chan struct{}) {
	<-a
	fmt.Println("A()")
	time.Sleep(time.Second)
	close(b)
}

func B(a, b chan struct{}) {
	<-a
	fmt.Println("B()")
	close(b)
}

func C(a chan struct{}) {
	<-a
	fmt.Println("C()")
}

func main() {
	introduction()

	a := make(chan struct{})
	b := make(chan struct{})
	c := make(chan struct{})

	go C(c)
	go A(a, b)
	go C(c)
	go B(b, c)
	go C(c)

	close(a)
	time.Sleep(3 * time.Second)
}

运行结果

This code shows how to specify the running order of goroutines
A()
B()
C()
C()
C()

Sync

使用sync 包来实现对goroutine的等待。

package main

import (
	"flag"
	"fmt"

	"sync" // sync package
)

func main() {
	n := flag.Int("n", 20, "Number of goroutines")
	flag.Parse()
	count := *n
	fmt.Printf("Going to create %d goroutines.\n", count)

	var waitGoup sync.WaitGroup //创建一个wait group

	fmt.Println("Start")

	for i := 0; i < count; i++ {
		waitGoup.Add(1)  // wait count递增
		go func(x int) {
			defer waitGoup.Done() // wait count递减。defer的作用相当于C#中的finally,将后面接的语句延后到函数退出时执行。
			fmt.Print(x, " ")
		}(i)
	}

	waitGoup.Wait()
	fmt.Print("\nEnd")
}

.Add() 和 .Done() 的调用次数必须相同,如果add多了,那么会得到

fatal error: all goroutines are asleep - deadlock!

如果done多了,那么会得到

panic: sync: negative WaitGroup counter

Channel

channel是goroutine间交换数据的一种机制。

chan 关键词创建channel。用 close 函数来关闭channel。

package main

import (
	"fmt"
	"sync"
)

func main() {
	ch := make(chan int)

	var wg sync.WaitGroup
	wg.Add(2)

	go Print(ch, &wg)

	go func() {
		for i := 1; i <= 11; i++ {
			ch <- i
		}
		close(ch)
		defer wg.Done()
	}()

	wg.Wait() 
	_, ok := <-ch
	if ok {
		fmt.Println("\nChannel is open!")
	} else {
		fmt.Println("\nChannel is closed!")
	}
}

func Print(ch <-chan int, wg *sync.WaitGroup) {
	for n := range ch { // reads from channel until it's closed
		fmt.Println(n)
	}
	defer wg.Done()
}

运行结果

1 2 3 4 5 6 7 8 9 10 11 
Channel is closed!

如果读取一个close的channel,会得到channel对应数据类型的零值。

package main 
 
import ( 
    "fmt" 
) 
 
func main() { 
    willClose := make(chan int, 10) 
 
    willClose <- -1 
    willClose <- 0 
    willClose <- 2 
 
    <-willClose 
    <-willClose 
    <-willClose 
    
	close(willClose) 
    read := <-willClose 
    fmt.Println(read) 
} 

运行结果

0

Channel在作为函数参数时,还能指定send-only还是receive-only。默认是双向的。

package main

import "fmt"


func ping(pings chan<- string, msg string) {
	pings <- msg
}

func pong(pings <-chan string, pongs chan<- string) {
	msg := <-pings
	pongs <- msg
}

func main() {
	pings := make(chan string, 1)
	pongs := make(chan string, 1)
	ping(pings, "passed message")
	pong(pings, pongs)
	fmt.Println(<-pongs)
}

chan<- 是send-only channel。 <-chan 是receive-only channel。

请记住,channel类型的零值是nil。如果对一个closed的channel发数据,那么程序将panic。但是从closed的channel读取,可以得到channel对应type的零值。因此,channel关闭后,不能往里写,但是可以往外读。

为了能够关闭channel,首先channel不能为receive-only。此外,nil channel总是block的。channel的这个特性非常适合用来disable select语句中的某个分支,只要往那个分支的channel 写入nil即可。

Buffered channel

带缓冲的channel可以让Go 调度器将任务放入队列,从而腾出手来处理其他的请求。另外,带缓冲的channle还能用作semaphore,来限制程序的吞吐。

package main

import (
	"fmt"
	"sync"
	"time"
)

func introduction() {
	fmt.Println("This code shows how a buffered channel works")
}

func timeout(wg *sync.WaitGroup, t time.Duration) bool {
	ch := make(chan int)
	go func() {
		defer close(ch)
		time.Sleep(5 * time.Second)

		wg.Wait()
	}()

	select {
	case <-ch:
		return false
	case <-time.After(t):
		return true
	}
}

func main() {
	introduction()

	nums := make(chan int, 5)
	counter := 10

	for i := 0; i < counter; i++ {
		select {
		case nums <- i:
		default:
			fmt.Println("No enough space for", i)
		}
	}

	for i := 0; i < counter+5; i++ {
		select {
		case num := <-nums:
			fmt.Println(num)
		default:
			fmt.Println("No more to be done")
			break
		}
	}
}

结果:

This code shows how a buffered channel works
No enough space for 5
No enough space for 6
No enough space for 7
No enough space for 8
No enough space for 9
0
1
2
3
4
No more to be done
No more to be done
No more to be done
No more to be done
No more to be done
No more to be done
No more to be done
No more to be done
No more to be done
No more to be done

Nil channel

Nil channel的特性是always block。

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func introduction() {
	fmt.Println("This code shows how a nil channel works")
}

func add(c chan int) {
	sum := 0
	t := time.NewTimer(2 * time.Second)

	for {
		select {
		case input := <-c:
			sum += input
		case <-t.C:
			c = nil
			fmt.Println(sum)
		}
	}
}

func send(c chan int) {
	for {
		c <- rand.Intn(10)
	}
}

func main() {
	introduction()

	c := make(chan int)
	go add(c)
	go send(c)

	time.Sleep(3 * time.Second)
}

channel 的 channel

package main

import (
	"fmt"
	"time"
)

func introduction() {
	fmt.Println("This code shows how a channel of channel works")
}

var times int

func f1(cc chan chan int, f chan bool) {
	c := make(chan int)
	cc <- c
	defer close(c)

	sum := 0
	select {
	case x := <-c:
		for i := 0; i <= x; i++ {
			sum += i
		}
		c <- sum
	case <-f:
		return
	}
}

func main() {
	introduction()

	cc := make(chan chan int)
	upper_limit := 10
	for i := 1; i < upper_limit+1; i++ {
		f := make(chan bool)
		go f1(cc, f)
		ch := <-cc
		ch <- i
		for sum := range ch {
			fmt.Print("Sum(", i, ")=", sum)
		}
		fmt.Println()
		time.Sleep(time.Second)
		close(f)
	}
}

(end)