Go语言8-goroutine和channel

栏目: Go · 发布时间: 5年前

内容简介:Go语言从语言层面上就支持了并发,这与其他语言大不一样。Go语言中有个概念叫做goroutine,这类似我们熟知的线程,但是更轻。进程和线程进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。

Goroutine

Go语言从语言层面上就支持了并发,这与其他语言大不一样。Go语言中有个概念叫做goroutine,这类似我们熟知的线程,但是更轻。

进程、线程、协程

进程和线程

进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。

线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。

一个进程可以创建和撤销多个线程,同一个进程中的多个线程之间可以并发执行。

所以程序的类型可以分为以下几种:

  • 一个进程,它只有一个线程,就是单线程程序
  • 一个进程,它又多个线程,就是多线程程序
  • 一个进程,它可能还会fork多个子进程,就是多进程程序

并发和并行的区别

  • 多线程程序在单核的cou上运行,这是并发(concurrency)。
  • 多线程程序在多个核的cpu上运行(真正的同时运行),这才是并行(parallelism)。

并发,在微观上,任意时刻只有一个程序在运行。因为线程已经是CPU调度的最小单元,一个CPU一次只能处理一个线程。但是宏观上这些程序时同时在那里执行的,所以这个只是并发。

所以在 python 里,貌似讲的都是高并发,似乎没听过并行的概念。

协程和线程

协程,独一的栈空间,共享堆空间,调度由用户自己控制。本质上类似于用户级线程,这些用户级线程的调度也是自己实现的。

线程,一个线程上可以跑多个协程,协程是轻量级的线程。

goroutine 调度模型

Go的调度器模型:G-P-M模型。

  • G代表goroutine,它通过 go 关键字调用runtime.newProc创建。
  • P代表processer,可以理解为上下文。
  • M表示machine,可以理解为操作系统的线程。

设置Golang运行的cpu核数

设置当前的程序运行在多少核上,下面的例子是获取CPU的核数,然后运行在所有核上:

package main

import (
    "fmt"
    "runtime"
)

func main() {
    num := runtime.NumCPU()
    runtim.GOMAXPROCS(num)
    fmt.Println(num)
}

上面P的数目就是这里GOMAXPROCS设置的数目,通常设置为CPU核数。

1.8版本以上的Golang,是不需要做上面的设置的,默认就是运行在所有的核上。当然还是可以设置一下,比如限制只能使用多少核。

goroutine的示例:

package main

import (
    "fmt"
    "time"
)

func example() {
    var i int
    for {
        fmt.Println(i)
        i++
        time.Sleep(time.Millisecond * 30)
    }
}

func main() {
    go example()  // 起一个goroutine
    var j int
    for j > -100 {
        fmt.Println(j)
        j--
        time.Sleep(time.Millisecond * 100)
    }
    fmt.Println("运行结束")
}

Channel

不同goroutine之间要进行通讯,有下面2种方法:

  • 全局变量和锁同步
  • Channel

先讲管道(channel),然后讲 goroutine 和 channel 结合的一些用法。

这篇的channel可以参考下:

https://www.jianshu.com/p/24ede9e90490

全局变量的实现示例

在下面的例子里定义了变量 m 来实现goroutine之间的通讯:

package main

import (
    "fmt"
    "time"
    "sync"
)

var (
    m = make(map[int]uint64)
    lock sync.Mutex
)

type task struct {
    n int
}

func calc(t *task) {
    var res uint64
    res = 1
    for i := 1; i <= t.n; i++ {
        res *= uint64(i)
    }
    lock.Lock()
    m[t.n] = res  // 变量m用来存放结果,这样主线程里就能拿到m的值,操作要加锁
    lock.Unlock()
}

func main() {
    for i := 0; i < 100; i++ {
        t := &task{i}
        go calc(t)
    }
    for j := 0; j < 10; j++ {
        fmt.Printf("\r已运行%d秒", j)
        time.Sleep(time.Second)
    }
    fmt.Println("\r运行完毕,输出结果:")
    lock.Lock()
    for k, v := range m {
        if v != 0 {
            fmt.Printf("%d! = %v\n", k, v)
        }
    }
    lock.Unlock()
}

channel 概念

channel的概念如下:

  • 类型Unix中的管道(Pipe)
  • 先进先出
  • 线程安全,多个goroutine同时访问,不需要加锁
  • channel是有类型的,一个整数的channel只能存放整数

channel 声明

var 变量名 chan 类型

var test1 chan int
var test2 chan string
var tesr3 chan map[string]string
var test4 chan stu
var test5 chan *stu

只是声明还不够,使用前还要make,分配内存空间:

package main

import "fmt"

func main() {
    var intChan chan int  // 声明
    intChan = make(chan int, 10)  // 初始化,长度是10
    intChan <- 10  // 存入管道
    n := <- intChan  // 取出
    fmt.Println(n)
}

定义信号(空结构体)

有一些场景中,一些 goroutine 需要一直循环处理信息,直到收到 quit 信号。作为信号,只需要随便传点什么,并不关注具体的值。那么可以选择使用空结构体,像下面这样定义了2个信号:

msgCh := make(chan struct{})
quitCh := make(chan struct{})
// 传信号的方法
msgCh <- struct{}{}  // 前面的 struct{} 是变量的类型,后面的 {} 则是做初始化传入空值生成实例
quitCh <- struct{}{}

通过channel实现通讯

起一个goroutine往管道里存,再起一个goroutine从管道里把数据取出:

package main

import (
    "fmt"
    "time"
)

func write(ch chan int) {
    var i int
    for {
        ch <- i
        i ++
        time.Sleep(time.Millisecond)
    }
}

func read(ch chan int) {
    for {
        b := <- ch
        fmt.Println(b)
    }
}

func main() {
    intChan := make(chan int, 10)
    go write(intChan)
    go read(intChan)
    time.Sleep(time.Second * 5)
}

channel 的类型和阻塞

channel 分为不带缓存的 channel 和带缓存的 channel。

channel 一定要初始化后才能进行读写操作,否则会永久阻塞。这个不是这里要讲的重点,顺便带一下。

无缓存的channle

初始化make的时候不传入第二个参数设置容量就是:

ch := make(chan int)

从无缓存的 channel 中读取消息会阻塞,直到有 goroutine 向该 channel 中发送消息;同理,向无缓存的 channel 中发送消息也会阻塞,直到有 goroutine 从 channel 中读取消息。

有缓存的 channel

有缓存的 channel 的声明方式为指定 make 函数的第二个参数,该参数为 channel 缓存的容量:

ch := make(chan int, 10)

有缓存的 channel 类似一个阻塞队列(采用环形数组实现)。当缓存未满时,向 channel 中发送消息时不会阻塞,当缓存满时,发送操作将被阻塞,直到有其他 goroutine 从中读取消息;

相应的,当 channel 中消息不为空时,读取消息不会出现阻塞,当 channel 为空时,读取操作会造成阻塞,直到有 goroutine 向 channel 中写入消息。

缓冲区的大小

通过 len 函数可以获得 chan 中的元素个数,通过 cap 函数可以得到 channel 的缓冲区长度。

无缓存和缓冲区是1的差别

无缓存的 channel 的 len和cap 始终都是0。

通过无缓存的 channel 进行通信时,接收者收到数据 happens before 发送者 goroutine 唤醒

上面这句不好理解,不过可以先看下现象。

下面的这2行函数会报错,说是死锁。但是如果设置了 channel 的容量哪怕是1,就不会报错的:

func main() {
    ch := make(chan int)
    ch <- 1
}

虽然容量1的channel也只能存1个数,但是无缓冲区的channel似乎1个数都存不了,除非马上能取走:

func main() {
    ch := make(chan int, 1)
    // 要起一个goroutine可以马上接收channel里的数据
    go func () {
        fmt.Println(<- ch)
    }()
    ch <- 1
    time.Sleep(time.Second)  // 要给goroutine执行完成的时间
}

小结:无缓存的channel需要有一个goroutine可以把channel里的数据马上取走。

channel之间的同步

在学习关闭channel之前,先看下下面的例子。由于没有关闭channel,是会有问题的,不过例子里都解决了。先看下不用关闭channel可以怎么搞,然后再接着看关闭channel的用法:

package main

import (
    "time"
    "fmt"
)

func calc(taskChan chan int, resChan chan int) {
    for v := range taskChan {
        // 判断是不是素数
        flag := true
        for i := 2; i < v; i++ {
            if v % i == 0 {
                flag = false
                break
            }
        }
        if flag {
            resChan <- v
        }
    }
}

func main() {
    intChan := make(chan int, 1000)
    // 这个也是个goroutine
    go func(){
        for i := 2; i < 100000; i++ {
            intChan <- i
        }
    }()  // 管道满了之后,这个匿名函数会阻塞,但是不影响程序继续往下走

    resultChan := make(chan int, 1000)
    // 同时起8个goroutine
    for i := 0; i < 8; i++ {
        go calc(intChan, resultChan)
    }

    // 再起一个取结果的goroutine,不阻塞主线程
    go func(){
        for v := range resultChan{
            fmt.Println("素数:", v)
        }
    }()
    // 给上面的匿名函数几秒钟来输出结果
    time.Sleep(time.Second * 5)
}

上面的例子里用了2个匿名函数,也都是起的goroutine。如果是在主线程里直接for循环的话,那个for循环就会变成死锁,程序不会自己往下走。所以运行在goroutine里的死循环,在主线程退出后也就结束了,不会有问题。后一个匿名函数是对channel的进行遍历,channel取空后,会进入阻塞,如果是运行在主线程里的话也会形成死锁。

range 遍历

channel 也可以使用 range 取值,并且会一直从 channel 中读取数据,直到有 goroutine 对改 channel 执行 close 操作,循环才会结束。

关闭 channel

golang 提供了内置的 close 函数对 channel 进行关闭操作:

ch := make(chan int)
close(ch)

关于 channel 的关闭,有以下的特点:

  • 关闭一个未初始化(nil) 的 channel 会产生 panic
  • 重复关闭同一个 channel 会产生 panic
  • 向一个已关闭的 channel 中发送消息会产生 panic
  • 可以从已关闭的 channel 里继续读取消息,若消息均已读出,则会读到类型的零值。从一个已关闭的 channel 中读取消息不会阻塞,并且会返回一个为 false 的 ok-idiom,可以用它来判断 channel 是否关闭
  • 关闭 channel 会产生一个广播机制,所有向 channel 读取消息的 goroutine 都会收到消息

有2种方式可以把管道里的数据都取出来,但是都需要把管道关闭:

  • 判断管道已关闭并且取完了
  • 遍历管道

关闭channel然后读取的示例:

package main

import "fmt"

func main() {
    var ch chan int
    ch = make(chan int, 5)
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
    for {
        var b int
        b, ok := <- ch
        fmt.Println(b, ok)
        if ok == false {
            break
        }
    }
}

/* 执行结果
PS H:\Go\src\go_dev\day8\channel\close_chan> go run main.go
0 true
1 true
2 true
3 true
4 true
0 false
PS H:\Go\src\go_dev\day8\channel\close_chan>
*/

上面输出的最后一条,就是channel已经空了,读出来的就是类型的0值,并且ok变false了。

遍历channel的示例:

package main

import "fmt"

func main() {
    var ch chan int
    ch = make(chan int)  // 这个管道没有无缓存
    // 这个goroutine一次存一个,再存会阻塞,直到主线程后面的for循环遍历的时候取走数据
    // 存完100个数后,这里的for循环结束,会关闭管道。主线程后面的for循环的遍历就能正常结束了
    go func () {
        for i := 0; i < 100; i++ {
            ch <- i
        }
        close(ch)
    }()
    for v := range ch {
        fmt.Println(v)
    }
}

判断子线程结束

学到这里,再也不需要用Sleep等待子线程结束了,可以通过管道实现。可以单独定义一个专门用来判断子线程结束的管道。子线程完成任务后,就传个值给管道,主线程就阻塞的读管道里的信息,一旦读到信息,就说明子线程完成了,可以继续执行或者退出了。如果起了多个子线程,则主线程就是用for循环多读几次,就能判断出有多少子线程已经结束了。

channel 只读、只写

声明只读的channel:

var ch <-chan int

声明只写的channel:

var ch chan<- int

应用场景,管道需要能够可读可写。但是可以限制它在某个函数里的功能,也就是在定义函数的参数的时候,把管道的类型设置为只读或只写。 或者把管道传给结构体,结构体里限制管道的读写限制?

下面是之前的一个例子,仅仅只是把2个函数在设置参数类型的时候把管道的读写限制加上了:

package main

import (
    "fmt"
    "time"
)

func write(ch chan<- int) {
    var i int
    for {
        ch <- i
        i ++
        time.Sleep(time.Millisecond)
    }
}

func read(ch <-chan int) {
    for {
        b := <- ch
        fmt.Println(b)
    }
}

func main() {
    intChan := make(chan int, 10)
    go write(intChan)
    go read(intChan)
    time.Sleep(time.Second * 5)
}

配合 select 使用

select 用法类似IO多路复用,可以同时监听多个 channel 的消息状态,用法如下:

select {
    case <- ch1:
    ...
    case <- ch2:
    ...
    case ch3 <- 10;
    ...
    default:
    ...
}

select 可以同时监听多个 channel 的写入或读取:

  • 若只有一个 case 通过(不阻塞),则执行这个 case 块
  • 若有多个 case 通过,则随机挑选一个 case 执行
  • 若所有 case 均阻塞,则执行 default 块。若未定义 default 块,则 select 语句阻塞,直到有 case 被唤醒
  • 使用 break 会跳出 select 块
  • select 不会循环,就只会执行一个块然后就继续往后执行了

select只会执行一次

这个例子只会输出一次,随机是1或者是2,然后接结束了:

package main

import "fmt"

func main() {
    ch1 := make(chan int, 1)
    ch1 <- 1
    ch2 := make(chan int, 1)
    ch2 <- 2
    select {
    case v := <- ch1:
        fmt.Println(v)
    case v := <- ch2:
        fmt.Println(v)
    default:
        fmt.Println(0)
    }
}

所以如果要把管道里的数取完,或者取多次,就要再套一层for循环。

for循环和break的效果

在select外面用for套了一层死循环,这样就是反复的执行select。不过break在这里就没效果了:

package main

import (
    "fmt"
     "time"
)

func main() {
    var ch1, ch2 chan int
    ch1 = make(chan int, 10)
    ch2 = make(chan int, 10)
    for i := 0; i < cap(ch1); i++ {
        ch1 <- i
        ch2 <- i * i
    }

    // LABEL1:
    for {
        select {
        case v := <- ch1:
            fmt.Println("ch1", v)
        case v := <- ch2:
            fmt.Println("ch2", v)
        default:
            fmt.Println("所有元素都已经取完")
            break  // 这个break没有意义,因为值是跳出select,而不是for循环
            // break LABEL1  // 这个break可以直接跳出for循环
        }
        time.Sleep(time.Second)
    }
}

如果要跳出for循环,可以配合标签。上面的代码里已经写好了只是注释掉了。

定时器

定时器是在 time 包里的,

package main

import (
    "fmt"
    "time"
)

func main() {
    t := time.NewTicker(time.Second)
    for v := range t.C {
        fmt.Println(v)
    }
}

上面调用的NewTicker()方法返回的是个结构体,如下:

type Ticker struct {
    C <-chan Time // The channel on which the ticks are delivered.
    // contains filtered or unexported fields
}

上面的例子里遍历了 t.C 就是一个channel。time包内部应该是会产生一个goroutine,每隔一段时间就传一个数据进去。

设置超时时间

还有一个After()方法,和上面的方法是一样的。不过这个方法直接返回管道,即 NewTimer(d).C 。而NewTimer()方法的管道在返回的结构体的属性C里。这个After()方法用起来更方便。结合select正好可以做成一个设置任务超时时间的功能:

package main

import (
    "fmt"
    "time"
)

func task(ch chan struct{}) {
    time.Sleep(time.Second * 3)
    ch <- struct{}{}
}

func main() {
    ch := make(chan struct{})  // 定义好信号的管道,传递空结构体
    go task(ch)  // 启动一个任务
    select {
    case <- ch:
        fmt.Println("任务执行结束")
    case <- time.After(time.Second * 2):  // 2秒后超时
        fmt.Println("任务超时")
    }
}

goroutine 中使用 recover

程序里起的gorountine中如果panic了,并且这个goroutine里面没有捕获错误的话,整个程序就会挂掉。

下面的程序会报错(Panic),是gorountine里的产生的错误:

package main

func divideZero(ch chan int) {
    zero := 0
    ch <- 1 / zero
}

func main() {
    ch := make(chan int)
    go divideZero(ch)
    <- ch
}

在gorountine中运行错误了,是可以不影响其他线程和主线程的继续执行的。所以,好的习惯是每当产生一个goroutine,就在开头用defer插入recover, 这样在出现panic的时候,就只是自己退出而不影响整个程序。下面是优化后的代码,加入了recover来捕获错误:

package main

import "fmt"

func divideZero(ch chan int) {
    defer func () {
        if err := recover(); err != nil {
            fmt.Println(err)
            // 要给管道传值,否则主线程从空管道里取值会阻塞,形成死锁
            ch <- 0
        }
    }()
    zero := 0
    ch <- 1 / zero
}

func main() {
    ch := make(chan int)
    go divideZero(ch)
    <- ch
}

单元测试

测试用例的文件名必须以_test.go结尾,测试的函数也必须以Test开头。符合命名规则,使用 go test 命令的时候就能自动运行测试用例。

这篇的单元测试比较粗糙,不过基本怎么用,以及用法示例都简单记下来了。

被测试的函数

先准备一个需要被测试的函数:

package main

import "fmt"

func get_fullname(first, last string) (fullname string) {
    fullname = first + " " + last
    return
} 

func main() {
    fullname := get_fullname("Barry", "Allen")
    fmt.Println(fullname)
}

上面的 get_fullname() 函数就是接下来要进行单元测试的函数。

测试用例

package main

import "testing"

func TestName(t *testing.T) {
    r := get_fullname("Sara", "Lance")
    expect := "Sara Lance"
    if r != expect {
        t.Fatalf("ERROR: get_fullname expect: %s actual: %s", expect, r)
    }
    t.Log("测试成功")
}

执行测试

写完测试用例,就可以执行测试了,使用命令 go test。输出如下:

PS H:\Go\src\go_dev\day8\unit_test\name> go test
PASS
ok      go_dev/day8/unit_test/name      0.058s
PS H:\Go\src\go_dev\day8\unit_test\name>

看到PASS了,但是t.Log()并没有输出,要看到更多信息,要用带上-v参数。使用命令 go test -v ,输出如下:

PS H:\Go\src\go_dev\day8\unit_test\name> go test -v
=== RUN   TestName
--- PASS: TestName (0.00s)
        name_test.go:11: 测试成功
PASS
ok      go_dev/day8/unit_test/name      0.053s
PS H:\Go\src\go_dev\day8\unit_test\name>

直接用go test命令,只显示测试的结果。如果有多个测试用例,也只有一个结果。可以用-v参数看到详细的信息,每个测试用例的的结果都会打印出来。

如果某个测试失败了,就会直接退出,不会继续测试下去。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

推荐系统与深度学习

推荐系统与深度学习

黄昕、赵伟、王本友、吕慧伟、杨敏 / 清华大学出版社 / 2019-1-1 / 65.00元

本书的内容设置由浅入深,从传统的推荐算法过渡到近年兴起的深度学习技术。不管是初学者,还是有一定经验的从业人员,相信都能从本书的不同章节中有所收获。 区别于其他推荐算法书籍,本书引入了已被实践证明效果较好的深度学习推荐技术,包括Word2Vec、Wide & Deep、DeepFM、GAN 等技术应用,并给出了相关的实践代码;除了在算法层面讲解推荐系统的实现,还从工程层面详细阐述推荐系统如何搭建.一起来看看 《推荐系统与深度学习》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具