使用 goroutine 处理值并将结果收集到切片中

问题描述 投票:0回答:3

我最近正在探索 Go,goroutine 的工作原理让我很困惑。

我尝试使用 goroutine 将之前编写的代码移植到 Go 中,但出现了

fatal error: all goroutines are asleep - deadlock!
错误。

我想做的是使用 goroutine 处理列表中的项目,然后将处理后的值收集到一个新列表中。但我在“收集”部分遇到了问题。

代码:

sampleChan := make(chan sample)
var wg sync.WaitGroup

// Read from contents list
for i, line := range contents {
    wg.Add(1)
    // Process each item with a goroutine and send output to sampleChan
    go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
wg.Wait()

// Read from sampleChan and put into a slice
var sampleList []sample
for s := range sampleChan {
    sampleList = append(sampleList, s)
}
close(sampleChan)

从 goroutine 收集结果的正确方法是什么?

我知道切片不是线程安全的,所以我不能让每个 goroutine 都附加到切片上。

go slice goroutine
3个回答
22
投票

您的代码几乎是正确的。有几个问题:首先,您在收集结果之前等待所有工作人员完成,其次,当通道关闭时,您的

for
循环终止,但通道仅在
for
循环终止后才关闭.

您可以通过在工作人员完成时异步关闭通道来修复代码:

for i, line := range contents {
    wg.Add(1)
    // Process each item with a goroutine and send output to sampleChan
    go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}

go func() {
    wg.Wait()
    close(sampleChan)
}()

for s := range sampleChan {
  ..
}

作为风格注释(并遵循https://github.com/golang/go/wiki/CodeReviewComments#synchronous-functions),如果

newSample
是一个简单的同步函数,那就更好了t 获取等待组和通道,并简单地生成其结果。那么工作代码将如下所示:

for i, line := range contents {
    wg.Add(1)
    go func(line string) {
        defer wg.Done()
        sampleChan <- newSample(line, *replicatePtr, *timePtr)
    }(line)
}

这使您的并发原语保持在一起,除了简化

newSample
并使其更容易测试之外,它还允许您查看并发发生了什么,并直观地检查
wg.Done()
是否始终被调用。如果您想重构代码以例如使用固定数量的工作人员,那么您的更改将全部是本地的。


7
投票

有两个问题

  1. 使用无缓冲通道:无缓冲通道会阻塞接收器,直到通道上有数据可用,并阻塞发送器,直到接收器可用。这导致了错误
  2. 在范围之前不关闭通道:由于您从未关闭 ch 通道,因此范围循环将永远不会完成。

您必须使用

buffered
通道和
close
范围之前的通道

代码

package main

import (
    "fmt"
    "sync"
)

func double(line int, ch chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    ch <- line * 2

}

func main() {
    contents := []int{1, 2, 3, 4, 5}
    sampleChan := make(chan int,len(contents))
    var wg sync.WaitGroup
    // Read from contents list
    for _, line := range contents {
        wg.Add(1)
        go double(line, sampleChan, &wg)
    }
    wg.Wait()
    close(sampleChan)
    // Read from sampleChan and put into a slice
    var sampleList []int

    for s := range sampleChan {
        sampleList = append(sampleList, s)
    }

    fmt.Println(sampleList)
}

播放链接:https://play.golang.org/p/k03vt3hd3P

编辑: 另一种获得更好性能的方法是同时运行

producer
consumer

修改代码

package main

import (
    "fmt"
    "sync"
)

func doubleLines(lines []int, wg *sync.WaitGroup, sampleChan chan int) {
    defer wg.Done()

    defer close(sampleChan)
    var w sync.WaitGroup
    for _, line := range lines {
        w.Add(1)
        go double(&w, line, sampleChan)
    }
    w.Wait()
}

func double(wg *sync.WaitGroup, line int, ch chan int) {
    defer wg.Done()
    ch <- line * 2
}

func collectResult(wg *sync.WaitGroup, channel chan int, sampleList *[]int) {
    defer wg.Done()
    for s := range channel {
        *sampleList = append(*sampleList, s)
    }

}

func main() {
    contents := []int{0,1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
    sampleChan := make(chan int, 1)
    var sampleList []int

    var wg sync.WaitGroup

    wg.Add(1)
    go doubleLines(contents, &wg, sampleChan)
    wg.Add(1)
    go collectResult(&wg, sampleChan, &sampleList)
    wg.Wait()
    fmt.Println(sampleList)
}

播放链接:https://play.golang.org/p/VAe7Qll3iVM


0
投票

假设每个输入都有一个结果,那么通道或等待组足以解决问题。 两者都没有必要。

选项 1:消除等待组。 每个输入都会收到一个结果。

sampleChan := make(chan sample)

// Read from contents list
for i, line := range contents {
    // Process each item with a goroutine and send output to sampleChan
    go newSample(line, *replicatePtr, *timePtr, sampleChan)
}

// Read from sampleChan and put into a slice
var sampleList []sample
for range contents {
    sampleList = append(sampleList, <-sampleChan)
}

选项 2:消除通道。 分配足够大小的切片并将 goroutine 的结果写入切片。修改

newSample
以返回结果,而不是将结果发送到通道。通过这种方法,
sampleList
中结果的顺序与
contents
中输入的顺序相匹配。

var wg sync.WaitGroup
sampleList := make([]sample, len(contents))

for i, line := range contents {
    go func(i int, line string) {
       sampleList[i] = newSample(line, *replicatePtr, *timePt)
    }(i, line)
}
wg.Wait()
fmt.Println(sampleList)
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.