我使用线程池函数构建管道,并在其中传递 context.Context 作为参数。当调用 cancel() 函数或超时到期时,管道必须正常终止,以便没有剩余的工作 goroutine。
我使用的功能:
func generate(amount int) <-chan int {
result := make(chan int)
go func() {
defer close(result)
for i := 0; i < amount; i++ {
result <- i
}
}()
return result
}
func sum(input <-chan int) int {
result := 0
for el := range input {
result += el
}
return result
}
func process[T any, R any](ctx context.Context, workers int, input <-chan T, do func(T) R) <-chan R {
wg := new(sync.WaitGroup)
result := make(chan R)
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case val, ok := <-input:
if !ok {
return
}
select {
case <-ctx.Done():
return
case result <- do(val):
}
}
}
}()
}
go func() {
defer close(result)
wg.Wait()
}()
return result
}
用途:
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1200*time.Millisecond)
defer cancel()
input := generate(1000)
multiplied := process(ctx, 15, input, func(val int) int {
time.Sleep(time.Second)
return val * 2
})
increased := process(ctx, 15, multiplied, func(val int) int {
return val + 10
})
fmt.Println("Result: ", sum(increased)) // 360 is ok
fmt.Println("Num goroutine: ", runtime.NumGoroutine()) // 18 is too much
}
我知道发生这种情况是因为所有的increase goroutine都结束了,而multiply goroutine仍在运行。
有没有规范的方法来解决这个问题?
您期望类似结构化并发之类的东西,因此所有 goroutine 都应该在当前作用域的末尾结束,但不要根据您的期望来设计代码。当输入通道未耗尽并且您的
generate
功能不可取消时,您将泄漏 do
。
还有,与
case val, ok := <-input:
if !ok {
return
}
select {
case <-ctx.Done():
return
case result <- do(val):
}
}
当
do(val)
不接受发送时,您将丢失 result
的结果。
“高级 Go 并发模式”中提到了更多内容,但作为一般建议,当您的目标是结构化并发时,我建议先编写同步代码,然后稍后同时运行它们。