我在使用Mapreduce功能时遇到问题
现在我知道如何找到工作地点
from mrjob.job import MRJob
class MRWordCounter(MRJob):
def mapper(self, key, line):
for word in line.split():
yield word, 1
def reducer(self, word, occurrences):
yield word, sum(occurrences)
if __name__ == '__main__':
MRWordCounter.run()
但是如果问题是什么样,我必须打印单词出现在哪一行假设我有以下具有以下数据的文件
line 1- goat,chicken,horse
line 2- cat,horse
line 3- dog,cat,sheep
line 4- buffalo,dolphin,cat
line 5- sheep
所以输出应该像这样
"buffalo" ["buffalo,dolphin,cat"]
"cat" ["buffalo,dolphin,cat", "cat,horse", "dog,cat,sheep"]
"chicken" ["goat,chicken,horse"]
"dog" ["dog,cat,sheep"]
"dolphin" ["buffalo,dolphin,cat"]
"goat" ["goat,chicken,horse"]
"horse" ["cat,horse", "goat,chicken,horse"]
"sheep" ["dog,cat,sheep", "sheep"]
在mapper
中
yield word, line
在reducer
中
yield word, occurrences
它将创建word [line, otherline]
from mrjob.job import MRJob
class MRWordCounter(MRJob):
def mapper(self, key, line):
line = line.strip()
for word in line.split(","):
yield word, line
def reducer(self, word, occurrences):
yield word, occurrences
if __name__ == '__main__':
MRWordCounter.run()
EDIT:此版本使用第二个step
和新的reducer
对结果进行排序
from mrjob.job import MRJob
from mrjob.step import MRStep
class MRLineGrouperAndSorter(MRJob):
def steps(self):
return [
MRStep(
mapper=self.mapper,
reducer=self.reducer
),
MRStep(
reducer=self.reducer_sort
)
]
def mapper(self, key, line):
line = line.strip() # remove some spaces in line
words = line.split(",")
#line = ",".join(sorted(words)) # sorting words in lines
for word in words:
yield word, line
# def combiner(self, word, values):
# for line in values:
# yield word, line
def reducer(self, word, values):
yield None, (word, values) # use `None` (or other value) for all items and next reducer will get all items in one list
def reducer_sort(self, key, values): # key is None, values is [(word, values), (word, values), ...]
for word, line in sorted(values):
yield word, line
if __name__ == '__main__':
MRLineGrouperAndSorter.run()