jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

在Golang中使用pipeline

作者:jasper | 分类:Golang | 标签:     | 阅读 2021 次 | 发布:2015-08-15 10:44 p.m.

Go的并发可以很容易地有效利用IO和多CPU来创建数据流管道。下面将会讲些利用管道的例子,并介绍对错误的处理。

什么是管道(pipeline)

在Go中对于管道没有正式的定义,它只是很多并发程序中的一种。一般说来,一个管道就是一系列通过channel连接的stages,每个stage都是运行着相同函数的goroutines的集合,在每个stage中,goroutine干着下面三件事儿:

  • 通过channel从上游接收数据;
  • 对这些数据执行一些函数,通常再产生新的数据;
  • 再把数据发送到下游的channel中。

每个stage都有一些上下游channel,除了最开始和最后的stage,最开始的stage通常叫做source或是producer,最后的stage叫做sink或是consumer

下面我们从最简单的例子开始解释,之后会举一些更实用的例子。

来个求平方的例子

这是个由三个stage组成的管道,第一个stage叫gen,是一个函数用来转换一个int类型的list到一个int组成的channel,这个gen函数开始一个goroutine,将int数据发送到channel中,当数据被发送出去的时候,再讲channel关闭。

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

第二个stage叫sq,从一个channel接收int数据,再求每个接收到数据的平方,并返回一个channel。当流入的channel关闭的时候,这个stage再将数据发到下游,并关闭流出的channel。

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

main函数建立这个管道并运行最后的stage,它从第二个stage中接收数据,然后再打印出来,直到channel被关闭。

func main() {
    // 建立pipeline.
    c := gen(2, 3)
    out := sq(c)

    // 消费output.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

我们像其他的stage一样也可以用一个循环重写main函数:

func main() {
    // 建立pipeline并消费output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 然后 81
    }
}

输入输出

多个函数可以从相同的channel读取直到channel被关闭,这被叫做fan-out,提供一种方法在一组worker中平衡地分配CPU和IO的利用。

若一个函数可以从多个输入读取数据然后处理直到所有的多个输入的channel关闭,这个被叫做fan-in

我们通过运行从相同的输入channel中读取数据的两个实例sq来改变我们的管道,我们介绍个新的函数merge

func main() {
    in := gen(2, 3)

   // 从两个goroutine中分配sq的工作
    c1 := sq(in)
    c2 := sq(in)

    // 从c1 c2消费merge之后的output
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 然后 9, 或是 9 然后 4
    }
}

merge通过将值拷贝到下游单独的channel中,为每个上游channel开一个goroutine,将一个list的channel转化为一个简单的channel。一旦所以的outputgoroutines都已经开始,merge就开始又一个goroutine去关闭下游channel。

为了防止发送到一个已经关闭的chann而产生panic,所以要保证所有的发送完成都必须在close之前,sysnc.WaitGroup提供了一个简单的方式去解决同步。

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)


    // 在cs中为每个输入的channel起一个goroutine,
    // 输出把数据从c中拷贝出去,直到c或是done被关闭,然后调用wg.Done
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // 开始一个goroutine,在所有output的goroutine做完之后就关闭。
    // 这必须在wg.Add调用之后开始
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

在我们的pipline函数中,有这样的规则:

  • 当所有的发送操作关闭后,stage就关闭输出;
  • stage会持续从上游接收数据,直到上游channel被关闭。

这些规则允许每个接收stage都可以写成一个loop,来保证goroutine一直存在,直到所有的数据被成功地发送到下游。

但是在实际的pipeline中,stage并不总是接收所有的上游数据,有时会这样设计:接受者可能只需要一部分数据,更经常的情况是,一个stage很早就存在了,因为输入的数据在更早的stage里出错了。在这些情况下,接受者就没有必要一定要等到剩下的数据到达,而且我们希望是在更早的stage里就应该停止生产这些数据。

在我们的示例pipeline中,如果一个stage消费所有的上游数据都失败了,那么发送这些数据的goroutine就会无限期的block。

    // 从output消费第一个值
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 或者 9
    return
    // 因为我们不从out接收第二个值,
    // 所以output的goroutine之一会一直等待发送
}

这里会有资源的泄露:goroutine消耗内存和runtime资源,在goroutine中堆栈防止数据被gc,goroutine不能自己gc,除非它们自己退出。

我们需要让我们的上游stage在它的下游接收数据都失败的时候,能够自动退出。有一种方式是改变下游channel使之拥有一个buffer。这个buffer可以hold一定数量的数据,如果在buff里面没有空间了,发送的操作就立即停止。

c := make(chan int, 2) // buffer size设为2
c <- 1  // 马上成功
c <- 2  // 马上成功
c <- 3  // block直到另一个goroutines does <-c 并且收到 1

当发送的数据量在channel创建的时候就是已知的,buffer可以使代码简单化。例如,我们可以重写gen拷贝一个list的整数到一个buffer中,避免创建一个新的goroutine:

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

再次返回pipeline中block的goroutine,我们可能想给下游的channel增加一个buffer,并通过merge函数返回。

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // enough space for the unread inputs
    // ... 余下的不变 ...

虽然在程序中解决了goroutine的block问题,但是这是很烂的代码。buffer大小的选择决定于要知道merge函数将会收到的数据的数量。但是这仍然不够,如果我们将另外的值传递到gen中,或是如果下游的stage读取的值更少,那么goroutine还是会被block。

相反,我们需要提供一种方式可以让下游的stage向发送方表明,他们将停止接受输入。

显示地取消

main函数没有从out接收到值而决定退出的时候,它必须告诉stage中的上游goroutine丢掉它们正要发送的数据。它们通过一个叫做done的channel来达到这一点。

func main() {
    in := gen(2, 3)

    // 从两个goroutine中分配sq的工作
    c1 := sq(in)
    c2 := sq(in)

    // 从output消费第一个值
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // 告诉剩下的发送者我们执行结束
    done <- struct{}{}
    done <- struct{}{}
}

发送数据的goroutine用select语句来替换发送操作。当out发生或是从done接收到数据时,select就开始工作了。done的值类型是空的结构体,因为这个值可以是任意的类型。outputgoroutine继续循环上游的channelc,因此上游的stage不会被block。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 在cs中为每个输入的channel起一个goroutine,
    // 输出把数据从c中拷贝出去,直到c或是done被关闭,然后调用wg.Done
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... 余下的不变 ...

这个方法有一个问题,就是每一个下游的接受者都需要知道上游可能block的数量并提前通知发送者。

我们需要一种方式去告诉goroutine停止发送数据到下游。在Go中,我们可以做到这一点,通过关闭一个channel。

这就表示main可以通过关闭done而不用block所有的发送者。这个关闭是一个对发送者的广播信号。我们希望每一个pipeline的函数都接收done作为一个参数,并使用defer让所有从main函数发出的信号的stage都能退出。

func main() {
    //建立done channel,这个channel会在整个pipeline中共享
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // 从两个goroutine中分配sq的工作
    c1 := sq(done, in)
    c2 := sq(done, in)

    // 从output消费第一个值
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done将会被defer关闭
}

每个pipeline的stage在done一被关闭之后都会返回。在merge中的output可以直接返回,因为它知道上游的发送者sq将会停止发送当done被关闭之时。output保证所有通过defer返回的都会调用wg.Done

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 在cs中为每个输入的channel起一个goroutine,
    // 输出把数据从c中拷贝出去,直到c或是done被关闭,然后调用wg.Done
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... 余下的不变 ...

一样地,sqdone被关闭时,也会返回。

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

这里是对于pipeline用法的指南:

  • 当所有的发送做完之后,stage将会关闭下游的channel;
  • stage会一直从上游接收数据,直到这些上游channel被关闭或是发送者被block。

结论

在pipeline中处理失败是很麻烦的事情,因为pipeline中的每一个stage都有可能block。所以本文展示了利用关闭一个channel可以广播done信号给所有从pipeline开始的goroutine。


转载请注明出处:http://www.opscoder.info/golang_pipeline.html

其他分类: