实现管道并发模式时出现死锁

问题描述 投票:0回答:2

关于程序:我正在尝试通过我自己的一流类型intJob来实现

pipeline
模式。聚合管道的主要功能是
ExecutePipeline2
,据我所知,正是它导致了问题。

为什么我总是陷入僵局?对我来说,我似乎在每个 goroutine 之后关闭了读者使用的所有通道。另外,使用缓冲区创建通道也没有帮助,所以我已经完全没有想法了,非常感谢您的帮助。

重要:当基础(提到的函数)保持不变且不变时,我无法更改

main
函数并仅从其他函数实现此想法。

type intJob func(a, b chan int)

func ExecutePipeline2(jobs ...intJob) {
    outs := make([]chan int, len(jobs)+1)
    wg := sync.WaitGroup{}

    for i := 0; i < len(outs); i++ {
        outs[i] = make(chan int)
    }

    for i, job := range jobs {
        job := job
        in, out := outs[i], outs[i+1]
        i := i
        wg.Add(1)
        go func() {
            job(in, out)
            fmt.Printf("job %d closed\n", i)
            close(out)
            wg.Done()
        }()
    }

    wg.Wait()
}

func pipe(_, b chan int) {
    for i := 0; i < 5; i++ {
        b <- i
    }
}

func main() {
    inputData := []int{0, 1, 1, 2, 3, 5, 8}

    hashSignJobs := []intJob{
        intJob(func(in, out chan int) {
            for _, fibNum := range inputData {
                out <- fibNum
            }
        }),
        intJob(pipe),
        intJob(func(in, out chan int) {
            for val := range in {
                fmt.Println(val)
            }
        }),
    }

    ExecutePipeline2(hashSignJobs...)
}

go concurrency deadlock
2个回答
2
投票

我认为关键问题在于第二个管道

pipe()
,它不是从前一个管道的输出中读取,而是开始循环生成数字。应该写成下面这样才能从
in
读取为

func pipe(in, out chan int) {
    for i := range in { 
        out <- i
    }
}

0
投票

在此代码片段中创建的第一个“out”(1 个索引)通道

   for i, job := range jobs {
        job := job
        in, out := outs[i], outs[i+1] // here 
        i := i
        wg.Add(1)
        go func() {
            job(in, out)
//...
        }()
    }

不用作“生产者”,并且在第一个

intJob
的第一次迭代之后和第一个
intJob
的第二次迭代中产生死锁(因为
chan
已满):

    hashSignJobs := []intJob{
        intJob(func(in, out chan int) {
            for _, fibNum := range inputData {
                fmt.Printf("1 - %v\n", fibNum)
                out <- fibNum // fatal error: all goroutines are asleep - deadlock!
            }
        }),
// ....

enter image description here

游乐场

# ...

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc0000161b0?)
    /usr/local/go-faketime/src/runtime/sema.go:71 +0x25
sync.(*WaitGroup).Wait(0x10100000010?)
    /usr/local/go-faketime/src/sync/waitgroup.go:118 +0x48
main.ExecutePipeline2({0xc000084f20, 0x3, 0x0?})
    /tmp/sandbox346714581/prog.go:31 +0x1d2
main.main()
    /tmp/sandbox346714581/prog.go:59 +0xd5

goroutine 6 [chan send]:
main.main.func1(0x101000000000000?, 0xc00007a0e0)
    /tmp/sandbox346714581/prog.go:48 +0xb6
main.ExecutePipeline2.func1()
    /tmp/sandbox346714581/prog.go:24 +0x3a
created by main.ExecutePipeline2 in goroutine 1
    /tmp/sandbox346714581/prog.go:23 +0xdb
© www.soinside.com 2019 - 2024. All rights reserved.