前言

学习Go语言也有一段时间了,Go带给我的最直观的感受就是强大,简洁的并发能力。
于是乎,Go在Web等需要并发的地方大方光彩。我还记得第一次用Go写出一个httpServer是,
简直不敢相信如此简单明了。这可能是我接触语言较少有关系吧,反正如果是用C语言来写的花,
我估计还在为Socket监听,和多线程困恼。反正Go带给我的就是怎么酸爽!

Goroutine

Go语言中,每一个并发的活动成为goroutine,创建一个goroutine只需要在调用的函数前面加上关键字go,即可创建一个协程。
goroutine类型线程,但是比线程过于轻量,称为协程

这里给出书本上面的示例,实现一个并发的时钟服务器。

func main(){
    lis,err := net.Listen("tcp","localhost:8080")
    if err != nil {
        log.Fatal(err)
    }
    for {
        conn,err := lis.Accept()
        if err != nil {
            log.Println(err)
        }
        go func(){
            defer conn.Close()
            for {
                _,err := io.WriteString(conn, time.Now().Format("15:04:05\n"))
                if err != nil {
                    return 
                }
                time.Sleep(1*time.Second)
            }
        }(conn net.Conn)
        /*
        这里不要使用内部函数来捕获变量,申明一个参数,将需要的变量传递进来。
        */
    }
}

这里唯一需要关注的就是go启动的匿名函数,这个函数处理每一个连接请求,然后每秒钟发送一次格式化的时间。
仅仅加上了go,就让简易的Server有了并发能力。

通道

对于多个协程,它们可能做着互相独立不干扰的工作,但也可能协程之间需要密切配置,来共同完成同样的任务,比如说支持并发的WebSpider,可能有许多的goroutine不辞劳苦地爬爬爬,
但是改如何控制它们的整体行为,比如说某个goroutine爬过的url,其它的就不用再爬了。又或者说,用户想要停止,如何通知每个goroutine,让它们停下手中的活,乖乖的退出呢?

这个时候或许通道就派上用场了,这也是Go语言推荐的方案。

通道是可以让一个goroutine发送特定值到另一个goroutine的通信机制。每一个通道是一个具体类型的导管,叫做通道的元素类型

通道使用内置函数make来创建,通道类似map传递的是引用,零值是nil

  • ch := make(chan int) //创建一个通道

通道支持发送 接送关闭操作,前两个是通信,有点类似socket,在发送和接受的时候,若缓冲区满/空,则会进入阻塞状态,关闭后的通道,只允许会宕机。

  • ch <- x //发送
  • x = <-ch //接送&赋值
  • <- ch //仅接送
  • close(ch) //关闭一个通道

无缓冲通道

通道在创建的时候可以指定缓存的大小,默认的没有缓存。此时对其的读写操作都会造成阻塞,因此需要至少两个goroutine,一个负责读,另一个负责写,完成goroutine之间的通信。

没有缓冲的通道的特性实现两个goroutine同步化,因此又称为同步通道。这个时候若仅仅作为同步使用,一般会创建make(chan struct{})这个的通道以此强调。

管道 && 单向通道

听名字是不是很熟悉,类似shell里面的管道概念,例如ps aux|grep XX我们就使用到管道,一个命令的输出作为另一个命令的输入。
Go的通道很容易实现管道的功能,下面是一个管道的示例

func main(){
    naturals := make(chan int)
    squares := make(chan int)

    //计数器
    go func(){
        for x:=0 ; x<100 ; x++ {
            naturals <- x
        }
        close(naturals) //通知接收方停止
    }()
    
    //平方产生器
    go func(){
        for x := range naturals {
            squares <- x*x
        }
        close(squares) //同上
    }()

    //打印器
    for x := range squares {
        fmt.println(x)
    }
}

上面的例程有点类似流水生产线,上游->中游->下游,上游只负责写,中游负责读写,下游负责读。

对于明确知道只使用管道的读写之一的,我们可以写成函数,写成单通道的参数。

  • func counter(out chan<- int) //只写
  • func squarer(out chan<- int, in <-chan int)
  • func printer(in <-chan int)

这样若在函数内部错误的读一个只写的通道,编译器就会报错,可以帮助我们及早发现问题。

缓存通道

在创建的通道的时候指定大于零的容量,通道就变成了缓存通道缓存通道队列的特性。

缓存通道大小,容量可以使用len,cap内置函数来获取。

Select多路复用

当一个goroutine需要处理多个通道的时候,我们不能让一个某个通道阻塞我们的goroutine从而忽略了其它通道的处理,这个时候select就排上用场了(有点类似socket的轮询操作)。

select {
    case <-ch1:

    case x := <-ch2:

    case ch3 <- y:

    //可选
    default:
    //当default出现时,select永不阻塞,相当于轮询
    //没有default时,若所有case都是阻塞状态,这select会阻塞,
    //有任一case得到响应,select则进入相应的case执行代码
    //若同时有多个case满足条件,select随机选择一个进入
}

由于通道的可以是nil,零值,对零值的通道进行读写将会永远阻塞,
因此select中的零值case将永不会选中,这个特性可以用来开启/关闭一些功能。

示例

来个示例,加深一下理解

并发目录遍历


var verbose = flag.Bool("v", false, "Show verbose progress message")
var done = make(chan struct{}) //标识是否停止

func main(){
    flag.Parse()
    roots := flag.Args()
    if len(roots) == 0 {
        roots = []string{"."}
    }
    go func(){
        os.Stdin.Read(make([]byte,1)) //任意按键停止程序
        close(done)
    }()

    fileSize := make(chan int64) //接受文件大小
    var n sync.WaitGroup
    for _, root := range roots {
        n.Add(1)
        go walkDir(root, &n, fileSize)
    }
    go func(){
        n.Wait()
        close(fileSize)
    }()

    var tick <-chan time.Time
    if *verbose {
        tick = time.Tick(500*time.Millisecond) //500毫秒显示一次
    }
    
    var nfiles, nbytes int64
loop:
    for {
        select{
            case <-done:
                for range fileSize{} //吃空所有goroutine
                return
            case size,ok := <-fileSize:
                if !ok { 
                    break loop  //跳出for循环
                }
                nfiles ++
                nbytes += size
            case <-tick.C:  //定时任务
                printDiskUsage(nfiles,nbytes)
        }
    }
    printDiskUsage(nfiles, nbytes)
    fmt.Println(nbytes)
}

func printDiskUsage(nfiles,nbytes int64){
    fmt.Printf("du: %d files, %s\n",nfiles,formatUnit(nbytes))
}

func formatUnit(nbytes int64) string {
    units := []string{"bytes","Kb","Mb","Gb","Pb"}
    index := 0
    var unit = units[index]
    var b = float64(nbytes)
    for b>1e3 && index<len(units) {
        index ++
        unit = units[index]
        b /= 1e3
    }
    return fmt.Sprintf(".2f %s",b,unit)
}

func isCanceled() bool {
    select{
        case <-done:
            return true
        default:
            return false
    }
}

func walkDir(dir string, n *sync.WaitGroup, fileSize <-chan int64){
    defer n.Done()
    if isCanceled() {
        return
    }
    for _,entry := range dirents(dir) {
        if entry.IsDir() {
            n.Add(1)
            subDir := filePath.Join(dir,entry.Name())
            go walkDir(subDir,n,fileSize)
        } else {
            fileSize <- entry.Size()
        }
    }
}

var sema = make(chan struct{}, 20) //最多20个goroutine

func dirents(dir string) []os.FileInfo {
    select {
        case sema <- struct{}{}: //获取令牌
        case <-done:
            return nil
    }
    defer func(){ <-sema }() // 释放令牌

    entries,err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr,"du: %v\n", err)
        return nil
    }
    return entries
}

这个示例解决的问题

  • 限制goroutine的数量(20个)
  • 等待goroutine的结束
  • 快速响应的终止
  • 特性的开关

结尾

goroutine可谓是Go的一大法宝,配合通道可谓锦上添花。

以后可能会有许多的坑,而且这些东西我的理解还不够。

需要学习的东西还好多,比如说并发安全,变量共享等等。

Last modification:January 22nd, 2020 at 05:52 pm
要饭啦~