如何修复管道取消时的 goroutine 泄漏

问题描述 投票:0回答:1

我使用线程池函数构建管道,并在其中传递 context.Context 作为参数。当调用 cancel() 函数或超时到期时,管道必须正常终止,以便没有剩余的工作 goroutine。

我使用的功能:

func generate(amount int) <-chan int {
    result := make(chan int)
    go func() {
        defer close(result)
        for i := 0; i < amount; i++ {
            result <- i
        }
    }()

    return result
}

func sum(input <-chan int) int {
    result := 0
    for el := range input {
        result += el
    }
    return result
}
func process[T any, R any](ctx context.Context, workers int, input <-chan T, do func(T) R) <-chan R {
    wg := new(sync.WaitGroup)
    result := make(chan R)

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-input:
                    if !ok {
                        return
                    }
                    select {
                    case <-ctx.Done():
                        return
                    case result <- do(val):
                    }
                }
            }
        }()
    }

    go func() {
        defer close(result)
        wg.Wait()
    }()
    return result
}

用途:

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 1200*time.Millisecond)
    defer cancel()

    input := generate(1000)
    multiplied := process(ctx, 15, input, func(val int) int {
        time.Sleep(time.Second)
        return val * 2
    })
    increased := process(ctx, 15, multiplied, func(val int) int {
        return val + 10
    })

    fmt.Println("Result: ", sum(increased)) //  360 is ok
    fmt.Println("Num goroutine: ", runtime.NumGoroutine())  // 18 is too much
}

我知道发生这种情况是因为所有的increase goroutine都结束了,而multiply goroutine仍在运行。

有没有规范的方法来解决这个问题?

multithreading go pipeline goroutine
1个回答
0
投票

您期望类似结构化并发之类的东西,因此所有 goroutine 都应该在当前作用域的末尾结束,但不要根据您的期望来设计代码。当输入通道未耗尽并且您的

generate
功能不可取消时,您将泄漏
do

还有,与

                case val, ok := <-input:
                    if !ok {
                        return
                    }
                    select {
                    case <-ctx.Done():
                        return
                    case result <- do(val):
                    }
                }

do(val)
不接受发送时,您将丢失
result
的结果。

“高级 Go 并发模式”中提到了更多内容,但作为一般建议,当您的目标是结构化并发时,我建议先编写同步代码,然后稍后同时运行它们

© www.soinside.com 2019 - 2024. All rights reserved.