关闭具有周期性依赖性的通道

问题描述 投票:0回答:1

我正在尝试在Golang中实现类似于mapreduce的方法。我的设计如下:

  • 地图工作者从映射器输入通道中提取项目并输出到映射器输出通道

  • 然后,通过单个goroutine读取映射器输出通道。该例程维护以前看到的键值对的映射。如果映射器输出的下一项具有匹配键,则它将带有匹配键的新值和旧值都发送到reduce-input通道。

  • reduce-input管道将两个值简化为一个键-值对,并将结果提交到相同的map-output通道。

这将导致映射器输出与reduce输入之间的循环依赖性,我现在不知道如何用信号通知映射器输出已完成(并关闭通道)。

打破这种周期性依赖性或知道何时关闭具有这种周期性行为的通道的最佳方法是什么?

下面的代码在映射输出通道和reduce输入通道彼此等待时发生死锁。

type MapFn func(input int) (int, int)
type ReduceFn func(a int, b int) int

type kvPair struct {
    k int
    v int
}

type reducePair struct {
    k  int
    v1 int
    v2 int
}

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
    inputMapChan := make(chan int, len(input))
    outputMapChan := make(chan *kvPair, len(input))
    reduceInputChan := make(chan *reducePair)
    outputMapMap := make(map[int]int)
    go func() {
        for v := range input {
            inputMapChan <- v
        }
        close(inputMapChan)
    }()
    for i := 0; i < nMappers; i++ {
        go func() {
            for v := range inputMapChan {
                k, v := mapFn(v)
                outputMapChan <- &kvPair{k, v}
            }
        }()
    }
    for i := 0; i < nReducers; i++ {
        go func() {
            for v := range reduceInputChan {
                reduceValue := reduceFn(v.v1, v.v2)
                outputMapChan <- &kvPair{v.k, reduceValue}
            }
        }()
    }
    for v := range outputMapChan {
        key := v.k
        value := v.v
        other, ok := outputMapMap[key]
        if ok {
            delete(outputMapMap, key)
            reduceInputChan <- &reducePair{key, value, other}
        } else {
            outputMapMap[key] = value
        }
    }
    return outputMapMap, nil
}
go concurrency mapreduce channel
1个回答
0
投票

尝试一下:

package main

import "fmt"
import "sync"
import "sync/atomic"
import "runtime"
import "math/rand"
import "time"

type MapFn func(input int) *kvPair
type ReduceFn func(a int, b int) int

type kvPair struct {
    k int
    v int
}

type reducePair struct {
    k  int
    v1 int
    v2 int
}

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
    inputMapChan := make(chan int, len(input))
    outputMapChan := make(chan *kvPair, len(input))
    reduceInputChan := make(chan *reducePair)
    outputMapMap := make(map[int]int)

    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for _, v := range input {
            inputMapChan <- v
        }
        close(inputMapChan)
    }()

    for i := 0; i < nMappers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for v := range inputMapChan {
                outputMapChan <- mapFn(v)
            }
        }()
    }

    finished := false
    go func() {
        wg.Wait()
        finished = true
    }()

    var count int64
    for i := 0; i < nReducers; i++ {
        go func() {
            for v := range reduceInputChan {
                reduceValue := reduceFn(v.v1, v.v2)
                outputMapChan <- &kvPair{v.k, reduceValue}
                atomic.AddInt64(&count, -1)
            }
        }()
    }

    wg2 := sync.WaitGroup{}
    wg2.Add(1)
    go func() {
        defer wg2.Done()
        for {
            select {
            default:
                if finished && atomic.LoadInt64(&count) == 0 && len(outputMapChan) == 0 {
                    return
                }
                //runtime.Gosched()
            case v := <-outputMapChan:
                key := v.k
                value := v.v
                if other, ok := outputMapMap[key]; ok {
                    delete(outputMapMap, key)
                    atomic.AddInt64(&count, 1)
                    reduceInputChan <- &reducePair{key, value, other}
                } else {
                    outputMapMap[key] = value
                }
            }
        }
    }()

    wg2.Wait()
    return outputMapMap, nil
}

func main() {
    fmt.Println("NumCPU =", runtime.NumCPU())
    t := time.Now()
    a := rand.Perm(1000000)
    //a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}
    m, err := MapReduce(mp, rdc, a, 2, 2)
    if err != nil {
        panic(err)
    }
    fmt.Println(time.Since(t)) //883ms
    fmt.Println(m)
    fmt.Println("done.")
}

func mp(input int) *kvPair {
    return &kvPair{input & 7, input >> 3}
}
func rdc(a int, b int) int {
    b <<= 3
    if a != 0 {
        b |= a
    }
    return b
}
© www.soinside.com 2019 - 2024. All rights reserved.