我正在测试一个简单的示例来了解 MapReduce 和 mrjob。
目标是将所有数字的对数相加,并将所有数字的计数除以该总和。
代码非常简单明了:
# mrMedian.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import math
class MrMedian(MRJob):
def __init__(self, *args, **kwargs):
super(MrMedian, self).__init__(*args, **kwargs)
self.inCount = 0
self.inLogSum = 0.0
#increment the count of elements and add the
# logarithm of the current number to the summation
def map(self, key, val):
inVal = float(val)
self.inCount += 1
self.inLogSum += math.log(inVal)
# return the count and summation after all numbers are processed
def map_final(self):
yield (1, [self.inCount, self.inLogSum])
# aggregate the count and summation values and yield the result
def reduce(self, key, packedValues):
cumLogSum=1.0
cumN=0
for valArr in packedValues:
nj = int(valArr[0])
cumN += nj
cumLogSum += float(valArr[1])
median = cumN/cumLogSum
yield (median)
# define mapper and reducer
def steps(self):
return ([
MRStep(mapper=self.map, reducer=self.reduce, mapper_final=self.map_final)
])
# to run:
# python MrMedian.py < inputFile.txt
if __name__ == '__main__':
MrMedian.run()
在
map_final
方法中,我得到了(1, [self.inCount, self.inLogSum])
。值 1
是被忽略的键,列表 [self.inCount, self.inLogSum]
是在 reduce
方法中我们应该将其 (packedValues
) 视为可迭代的值,并使用 for
以某种方式迭代它循环。
我收到此错误:
(venv) shahriar@Lenovo:/media/shahriar/01D779182B58B9D0$ python mrMedian.py < inputFile.txt > outFile.txt No configs found; falling back on auto-configuration No configs specified for inline runner Creating temp directory /tmp/mrMedian.shahriar.20221113.152412.029427 Running step 1 of 1... reading from STDIN
Error while reading from /tmp/mrMedian.shahriar.20221113.152412.029427/step/000/reducer/00000/input:
Traceback (most recent call last):
File "/media/shahriar/01D779182B58B9D0/assignment2/mrMedian.py", line 43, in <module>
MrMedian.run()
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 616, in run
cls().execute()
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 687, in execute
self.run_job()
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 636, in run_job
runner.run()
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/runner.py", line 503, in run
self._run()
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 161, in _run
self._run_step(step, step_num)
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 170, in _run_step
self._run_streaming_step(step, step_num)
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 187, in _run_streaming_step
self._run_reducers(step_num, num_reducer_tasks)
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 289, in _run_reducers
self._run_multiple(
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 130, in _run_multiple
func()
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 746, in _run_task
invoke_task( File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/inline.py", line 133, in invoke_task
task.execute()
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 681, in execute
self.run_reducer(self.options.step_num)
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 795, in run_reducer
for k, v in self.reduce_pairs(read_lines(), step_num=step_num):
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 866, in reduce_pairs
for k, v in self._combine_or_reduce_pairs(pairs, 'reducer', step_num):
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 889, in _combine_or_reduce_pairs
for k, v in task(key, values) or ():
TypeError: cannot unpack non-iterable float object
作为
map_final
方法结果的输入文件是可以的:
shahriar@Lenovo-:/tmp/mrMedian.shahriar.20221113.152412.029427/step/000/reducer/00000$ cat input
1 [13, 78.5753201837955]
1 [13, 77.20894832945609]
1 [12, 75.70546637672973]
1 [12, 73.97942285230064]
1 [13, 78.7642193551817]
1 [13, 74.83203774429285]
1 [13, 72.28868623927899]
1 [11, 67.51370208632588]
我评论了reducer方法中的
for
循环来检查错误是否是由于packedValues
造成的,但我再次收到错误。
任何想法表示赞赏。
我已经通过reducer函数的yield(键,值)对解决了类似的问题。
def reduce(self, key, packed_values):
...
# must yield k, v here
yield 1, cumulative_n / cumulative_ln_val
查看 job.py 的源代码,看起来“line 889, in _combine_or_reduce_pairs”期望来自reducer函数的键值结果。
def _combine_or_reduce_pairs(self, pairs, mrc, step_num=0):
"""Helper for :py:meth:`combine_pairs` and :py:meth:`reduce_pairs`."""
step = self._get_step(step_num, MRStep)
task = step[mrc]
task_init = step[mrc + '_init']
task_final = step[mrc + '_final']
if task is None:
raise ValueError('No %s in step %d' % (mrc, step_num))
if task_init:
for k, v in task_init() or ():
yield k, v
# group all values of the same key together, and pass to the reducer
#
# be careful to use generators for everything, to allow for
# very large groupings of values
for key, pairs_for_key in itertools.groupby(pairs, lambda k_v: k_v[0]):
values = (value for _, value in pairs_for_key)
for k, v in task(key, values) or ():
yield k, v