我正在尝试在Golang中实现类似于mapreduce的方法。我的设计如下:
地图工作者从映射器输入通道中提取项目并输出到映射器输出通道
然后,通过单个goroutine读取映射器输出通道。该例程维护以前看到的键值对的映射。如果映射器输出的下一项具有匹配键,则它将带有匹配键的新值和旧值都发送到reduce-input通道。
这将导致映射器输出与reduce输入之间的循环依赖性,我现在不知道如何用信号通知映射器输出已完成(并关闭通道)。
打破这种周期性依赖性或知道何时关闭具有这种周期性行为的通道的最佳方法是什么?
下面的代码在映射输出通道和reduce输入通道彼此等待时发生死锁。
type MapFn func(input int) (int, int)
type ReduceFn func(a int, b int) int
type kvPair struct {
k int
v int
}
type reducePair struct {
k int
v1 int
v2 int
}
func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
inputMapChan := make(chan int, len(input))
outputMapChan := make(chan *kvPair, len(input))
reduceInputChan := make(chan *reducePair)
outputMapMap := make(map[int]int)
go func() {
for v := range input {
inputMapChan <- v
}
close(inputMapChan)
}()
for i := 0; i < nMappers; i++ {
go func() {
for v := range inputMapChan {
k, v := mapFn(v)
outputMapChan <- &kvPair{k, v}
}
}()
}
for i := 0; i < nReducers; i++ {
go func() {
for v := range reduceInputChan {
reduceValue := reduceFn(v.v1, v.v2)
outputMapChan <- &kvPair{v.k, reduceValue}
}
}()
}
for v := range outputMapChan {
key := v.k
value := v.v
other, ok := outputMapMap[key]
if ok {
delete(outputMapMap, key)
reduceInputChan <- &reducePair{key, value, other}
} else {
outputMapMap[key] = value
}
}
return outputMapMap, nil
}
尝试一下:
package main
import "fmt"
import "sync"
import "sync/atomic"
import "runtime"
import "math/rand"
import "time"
type MapFn func(input int) *kvPair
type ReduceFn func(a int, b int) int
type kvPair struct {
k int
v int
}
type reducePair struct {
k int
v1 int
v2 int
}
func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
inputMapChan := make(chan int, len(input))
outputMapChan := make(chan *kvPair, len(input))
reduceInputChan := make(chan *reducePair)
outputMapMap := make(map[int]int)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for _, v := range input {
inputMapChan <- v
}
close(inputMapChan)
}()
for i := 0; i < nMappers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for v := range inputMapChan {
outputMapChan <- mapFn(v)
}
}()
}
finished := false
go func() {
wg.Wait()
finished = true
}()
var count int64
for i := 0; i < nReducers; i++ {
go func() {
for v := range reduceInputChan {
reduceValue := reduceFn(v.v1, v.v2)
outputMapChan <- &kvPair{v.k, reduceValue}
atomic.AddInt64(&count, -1)
}
}()
}
wg2 := sync.WaitGroup{}
wg2.Add(1)
go func() {
defer wg2.Done()
for {
select {
default:
if finished && atomic.LoadInt64(&count) == 0 && len(outputMapChan) == 0 {
return
}
//runtime.Gosched()
case v := <-outputMapChan:
key := v.k
value := v.v
if other, ok := outputMapMap[key]; ok {
delete(outputMapMap, key)
atomic.AddInt64(&count, 1)
reduceInputChan <- &reducePair{key, value, other}
} else {
outputMapMap[key] = value
}
}
}
}()
wg2.Wait()
return outputMapMap, nil
}
func main() {
fmt.Println("NumCPU =", runtime.NumCPU())
t := time.Now()
a := rand.Perm(1000000)
//a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}
m, err := MapReduce(mp, rdc, a, 2, 2)
if err != nil {
panic(err)
}
fmt.Println(time.Since(t)) //883ms
fmt.Println(m)
fmt.Println("done.")
}
func mp(input int) *kvPair {
return &kvPair{input & 7, input >> 3}
}
func rdc(a int, b int) int {
b <<= 3
if a != 0 {
b |= a
}
return b
}