以下文章来源于陆贵成 ,作者陆贵成
你好,我是陆贵成,欢迎关注。
通常程序会被编写为一个顺序执行并完成一个独立任务的代码,如果没有特殊要求最好总是这样写,因为这种类型的程序通常容易写,也容易维护。
不过有些情况下,并行执行多个任务会有更大的好处,比如 web 服务在各自独立的套接字 socket 上同时接受多个数据请求,它可以显著提高这类系统的性能。
所以 Go 语言的语法和运行时就直接内置了对并发的支持。
Go语言里的并发是指能让一个函数独立于其他函数运行的能力。
当一个函数创建为 goroutine 时,Go 会将其视为一个独立的单元,然后被调度到可用的逻辑处理器上执行。
这个调度器在操作系统之上,将操作系统的线程和语言运行时的逻辑处理器绑定,并在逻辑处理器上运行 goroutine。
首先需要理解操作系统的进程和线程,这有助于理解 Go 语言运行时调度器如何利用操作系统来并发运行 goroutine。
当运行一个应用程序时,操作系统会为这个应用程序启动一个进程,可以理解为分配了一些常用资源的容器,这个进程维护了一些资源,比如内存,文件,设备句柄,线程等。
每个进程至少有一个线程,一个线程就是一个执行空间,这个空间会被操作系统调度来运行函数中所写的代码。进程初始化的线程称为主线程,当主线程终止的时候,应用程序就会终止。
操作系统会在物理处理器上调度线程来运行,而 Go 语言的运行时会在逻辑处理器上调度 goroutine 来运行。
系统调度器有一个全局运行队列,当一个 goroutine 准备运行的时候,这个 goroutine 会放到全局运行队列中,然后调度器就将这个队列中的 goroutine 分配给一个逻辑处理器,然后放在逻辑处理器的本地运行队列中。然后 goroutine 会一直等待到自己被分配的逻辑处理器处理。
当一个 goroutine 发生阻塞时,该 goroutine 和线程会从逻辑处理器中分离,该线程会继续阻塞,然后会创建一个线程重新绑定到该逻辑处理器上,然后继续处理其他的 goroutine。
当阻塞的 goroutine 执行完成并返回,对应的 goroutine 会放回到本地运行队列中,之前的线程保存好,以便以后使用。
希望让 goroutine 并行,必须使用多于一个逻辑处理器。当有多个逻辑处理器时,调度器会将 goroutine 平等分配到每个逻辑处理器上。
这会让 goroutine 在不同的线程上运行。不过要想真的实现并行的效果,用户需要让自己的程序运行在有多个物理器的机器上。
否则,哪怕 Go 语言运行时使用了多个线程,goroutine 依然会在同一个物理处理器上并发运行,达不到并行的效果。
Goroutine 的使用
Go 中使用并发只需要在函数前使用 go 关键字即可,代码如下:
package main
go
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func main() {
fmt.Println("start")
go func() {
fmt.Println("run goroutine") // 不会被执行,因为主线程已经提前结束
}()
fmt.Println("end")
}
但单独使用 goroutine 会导致主线程提前终止,导致 goroutine 中的代码无法被执行到,所以需要结合 sync.WaitGroup 使用。
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func main() {
fmt.Println("start")
// 计数器加一
wg.Add(1)
go func() {
// 当前函数返回时,计数器减一
defer wg.Done()
fmt.Println("run goroutine")
}()
fmt.Println("waiting")
// 阻塞主进程,直到所有 goroutine 运行完毕(计数器为0时)
wg.Wait()
fmt.Println("end")
}
单个逻辑处理器运行多个goroutine
单个逻辑处理器处理多个 goroutine 时,如果某个 goroutine 的占用时间过长,调度器会停止当前的 goroutine ,然后去执行其他可运行的 goroutine。
所以单个逻辑处理器运行多个 goroutine 时,你会看到会交替打印出结果,而不是执行完一个 goroutine,再去执行其他的 goroutine。
package main
import (
"fmt"
"runtime"
"sync"
)
var wg sync.WaitGroup
// 单个逻辑处理器
func main() {
fmt.Println("开始")
// 计数器加2,表示需要等待2个goroutine执行完毕
wg.Add(2)
// 设置当前使用逻辑处理器的数量
runtime.GOMAXPROCS(1)
// 查看当前使用的逻辑处理器数量
//fmt.Println(runtime.GOMAXPROCS(0)) // 1
go printPrime("A")
go printPrime("B")
fmt.Println("等待中")
wg.Wait()
fmt.Println("结束")
// 多次运行,打印的结果顺序可能会不一样,因为并发的时间是不确定的
// 结果大概如下:
// 开始
// 等待中
// B: 2
// ...
// B:3907
// A: 2
// ...
// A: 4999
// B: 3911
// ...
//结束
}
// 查找素数
func printPrime(prefix string) {
defer wg.Done()
next:
for n:=2; n<5000; n++ {
for m:=2; m<n; m++ {
if n%m == 0 {
continue next
}
}
fmt.Printf("%s: %d\n", prefix, n)
}
}
多个逻辑处理器处理多个 goroutine
当有多个逻辑处理器运行多个 goroutine 时,如果代码时在多核物理处理器中运行,那代码的运行就会存在并行,并行时多个 goroutine 时同时在不同的物理处理器上运行,就像以下代码,两个 goroutine 在很短的时间内几乎是同时发生,可以尝试修改 runtime.GOMAXPROCS(1) 使用不同的逻辑处理器来执行代码,如果单个逻辑处理器执行以下代码的话,在很短的时间内时可以先把一个 goroutine 执行完毕的,所以会看到打印出来的结果是分开的,并不会几个 goroutine 的结果混在一起。
package main
import (
"fmt"
"runtime"
"sync"
)
var wg sync.WaitGroup
// 多个逻辑处理器
func main() {
fmt.Println("开始")
// 设置逻辑处理去使用当前机器的逻辑处理器数量, 默认就是使用当前机器支持的最大逻辑处理器数量
runtime.GOMAXPROCS(runtime.NumCPU())
// 可以尝试不同逻辑处理器比较运行结果
//runtime.GOMAXPROCS(1)
// 查看当前逻辑处理器数量
//fmt.Println(runtime.GOMAXPROCS(0))
wg.Add(2)
go printLetter('a')
go printLetter('A')
fmt.Println("等待")
wg.Wait()
fmt.Println("\n结束")
// 开始
// 等待
// A B C D E F a b c d e f g h i j k l m n o p q r s t u G v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q H I J K L M N O P Q R S T U V W X Y r s t u v w x y z Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
// 结束
}
// 打印字母
func printLetter(firstLetter rune) {
defer wg.Done()
for count:=0; count < 3; count++ {
for char := firstLetter; char < firstLetter + 26; char++ {
fmt.Printf("%c ", char)
}
}
}
竞争状态
如果两个或多个 goroutine 在没有相互同步的情况下,访问某个共享的资源,并试图同时读或写这个资源,就处于相互竞争的状态,这种情况就称为竞争状态。
对一个共享资源的读和写应该是原子化的,也就是说,同一时刻只能有一个 goroutine 对共享资源进行读和写的操作。
使用 go build -race(竞争检测器)来检查竞争状态,执行 go build -race 后,会在项目目录下生产一个名称为 项目名称的 可执行文件。执行该可执行文件,可以在控制台看到竞争状态的关系。
可以有3中方法解决资源竞争状态的问题:
原子函数
加互斥锁
使用通道 channel
使用 runtime.Gosched() 退出当前线程,使资源竞争更加明显。
以下的代码就存在竞争状态,共享变量 number 在两个 goroutine 执行后,结果应该执行了4次,正确结果应该是 4,可是运行代码后,有时会得到2,3,4 三种不同的结果。
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
)
var (
// 竞争状态:同一时间访问同一个共享资源的状态
number int64
wg sync.WaitGroup
)
func main() {
fmt.Println("开始")
wg.Add(2)
// 查看当前逻辑处理器数量
//fmt.Println(runtime.GOMAXPROCS(0))
go incCounter()
go incCounter()
fmt.Println("等待")
wg.Wait()
fmt.Println("\n结束", number)
}
func incCounter() {
defer wg.Done()
for n:=0; n<2; n++ {
value := number
// 当前 goroutine 从线程退出,并放回队列
runtime.Gosched()
value++
number = value
}
}
原子函数
使用 sync/atomic 这个包的 AddInt64() 来解决竞争问题
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
)
var (
// 竞争状态:同一时间访问同一个共享资源的状态
number int64
wg sync.WaitGroup
)
func main() {
fmt.Println("开始")
wg.Add(2)
// 查看当前逻辑处理器数量
//fmt.Println(runtime.GOMAXPROCS(0))
go incCounter()
go incCounter()
fmt.Println("等待")
wg.Wait()
fmt.Println("\n结束", number)
}
func incCounter() {
defer wg.Done()
for n:=0; n<2; n++ {
atomic.AddInt64(&number, 1)
// 当前 goroutine 从线程退出,并放回队列
runtime.Gosched()
}
}
还有两个比较有用的原子函数:atomic.StoreInt64() 和 atomic.LoadInt64()
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var wg sync.WaitGroup
// 原子函数 之 整形状态同步,LoadInt64, StoreInt64
func main() {
wg.Add(2)
go doWork("A")
go doWork("B")
time.Sleep(1 * time.Second)
fmt.Println("Shutdown now!")
// 设置 shutdown 标志
atomic.StoreInt64(&shutdown, 1)
wg.Wait()
}
var shutdown int64
func doWork(name string) {
defer wg.Done()
for {
fmt.Printf("%s Doing\n", name)
time.Sleep(250 * time.Millisecond)
// 获取 shutdown 值
if atomic.LoadInt64(&shutdown) == 1 {
fmt.Printf("Shutting %s down\n", name)
break
}
}
}
加互斥锁
使用 var mutex sync.Mutex 互斥锁来解决资源竞争问题, mutex.Lock() 和 mutex.Unlock() 的使用。
package main
import (
"fmt"
"runtime"
"sync"
)
var (
// 互斥锁
mutex sync.Mutex
wg sync.WaitGroup
number int
)
func main() {
wg.Add(2)
go incCounter()
go incCounter()
wg.Wait()
fmt.Println("\n结束", number)
}
func incCounter() {
defer wg.Done()
for n:=0; n<2; n++ {
// 加锁
mutex.Lock()
value := number
// 当前 goroutine 从线程退出,并放回队列
runtime.Gosched()
value++
number = value
// 解锁
mutex.Unlock()
}
}
通道 channel
在 goroutine 之间同步和传递数据可以使用通道 channel 这种数据类型。通道又分为无缓冲通道(同步通道)和 有缓冲通道。
无缓冲通道要求发送和接收同时准备好,才能完成发送和接收操作,如果发送和接收都没有准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。其中任意一个操作都无法离开另一个单独存在。
以下是无缓冲通道的示例,模拟两个运动员之间的传球,展示两个goroutine 使用无缓冲通道来传递数据。
package main
import (
"fmt"
"math/rand"
"sync"
)
var (
wg sync.WaitGroup
number int
)
// 无缓冲通道
func main() {
count := make(chan int)
wg.Add(2)
go player("A", count)
go player("B", count)
count <- 1
wg.Wait()
}
func player(name string, count chan int) {
defer wg.Done()
for {
ball, ok := <-count
// ok 为 false 则表示通道已经关闭
if !ok {
fmt.Printf("Player %s won\n", name)
return
}
// 随机数
n := rand.Intn(100)
// 模拟接不到球
if n%13 == 0 {
fmt.Printf("Player %s missed\n", name)
close(count)
return
}
fmt.Printf("Player %s hit %d\n", name, ball)
ball++
count <- ball
}
}
有缓冲通道,是一种在被接收前能存储一个或多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送或接收。
通道中没有数据时,接收动作才会阻塞;通道中已经存满时,发送动作才会阻塞。
通道关闭后,不能再继续发送数据到通道内,但可以继续从通道中获取数据。
从一个已经关闭且没有数据的通道获取数据,总会立刻返回,并返回通道类型的零值。
以下是有缓冲通道的使用示例
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
var (
// 定义 goroutine 数量
goroutineCount = 4
// 任务数
taskCount = 10
wg sync.WaitGroup
)
func main() {
// 有缓冲通道 类型为 string, 缓冲数 taskCount 为 10
tasks := make(chan string, taskCount)
wg.Add(goroutineCount)
for gr:=1; gr<=goroutineCount; gr++{
// 开启多个 goroutine 执行任务
go worker(tasks, gr)
}
// 存放任务到任务通道中
for task :=1; task <= taskCount; task++ {
tasks <- fmt.Sprintf("Task %d\n", task)
}
// 执行完所有的 goroutine 后,关闭通道
close(tasks)
// 阻塞主线程,等到所有 goroutine 执行完毕在往下执行
wg.Wait()
}
func worker(tasks chan string, worker int) {
defer wg.Done()
for {
// 从任务通道中取任务 和 通道 是否关闭的状态
task, ok := <-tasks
// 通道已经关闭
if !ok {
fmt.Printf("Worker %d: shutdown\n", worker)
return
}
// 模拟工作
sleep := rand.Int63n(100)
time.Sleep(time.Duration(sleep) * time.Millisecond)
fmt.Printf("Worker %d started %s", worker, task)
// 完成工作
fmt.Printf("Worker %d completed %s\n", worker, task)
}
}
推荐阅读