我正在学习 golang context 包。我有一个包含以下代码的主要 go 例程。
func main() {
defer fmt.Println("main closing...")
wg := sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
applesStream := Generator("apple", ctx)
orangesStream := Generator("orange", ctx)
peachesStream := Generator("peach", ctx)
wg.Add(1)
go func1(ctx, &wg, applesStream)
wg.Add(1)
go genericFunc(ctx, &wg, orangesStream)
wg.Add(1)
go genericFunc(ctx, &wg, peachesStream)
wg.Wait()
}
Generator 函数将无限打印传递给它的字符串值,并在 ctx.Done() 通道收到任何值后关闭。 其中 func1() 和 genericFunc() 如下
func func1[T any](ctx context.Context, parentWg *sync.WaitGroup, stream <-chan T) {
defer fmt.Println("func1 closing...")
defer parentWg.Done()
wg := sync.WaitGroup{}
doWork := func(ctx context.Context) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case val, ok := <-stream:
if !ok {
fmt.Println("channel closed!")
return
}
fmt.Println(val)
}
}
}
newCtx, cancel := context.WithTimeout(ctx, time.Second*2)
defer cancel()
for i := 0; i < 3; i++ {
wg.Add(1)
go doWork(newCtx)
}
wg.Wait()
}
func genericFunc[T any](ctx context.Context, wg *sync.WaitGroup, stream <-chan T) {
defer fmt.Println("genericFunc closing...")
defer wg.Done()
go func() {
for {
select {
case <-ctx.Done():
return
case val, ok := <-stream:
if !ok {
fmt.Println("channel closed!")
return
}
fmt.Println(val)
}
}
}()
}
我希望这段代码在前 3 秒内打印苹果、橙子和桃子,之后它将无限打印橙子和桃子。 但是主 Goroutine 在 3 秒后关闭,这是不应该发生的。
当我在主 go 例程中将 waigGroup 计数器仅增加 1 时,它的行为就如预期的那样。因此,只需将 wg.Add(1)
中的任何一个更改为
wg.Add(2)
即可。所以这意味着在 main go 例程中,在 func1 完成后 waigGroup 计数器不知何故达到零,或者在 func1 完成后 waitGroup 不等待,但我不明白这是如何发生的。
genericFunc
几乎会立即退出,因为在此函数中,您只是启动一个新的 goroutine 来消费来自
stream
通道的消息。因此,在启动主函数后,
wg.Done()
将被调用两次。
func1
中,您创建了一个超时时间为2秒的上下文,因此所有
doWork()
调用将在2秒后退出,
func1
也是如此,这导致
wg.Done()
最后第三次被调用,并且
main
结束。
go func
中的
genericFunc
(即直接在函数中消费消息)和
func1
中的上下文超时。