运行此代码以使Hadoop中的概率将我的数据存储在CSV文件中。
当我在集群中运行此代码时遇到此错误“ java.lang.RuntimeException:PipeMapRed.waitOutputThreads():子进程失败,代码为1”,任何人都可以修复我的代码。
#!/usr/bin/env python3
"""mapper.py"""
import sys
# Get input lines from stdin
for line in sys.stdin:
# Remove spaces from beginning and end of the line
line = line.strip()
# Split it into tokens
tokens = line.split('')
#Get ClassA values
try:
ClassA = tokens[1]
print ('%s\t%s') % (None, ClassA)
except ValueError: pass
#!/usr/bin/env python3
"""reducer.py"""
import sys
from collections import Counter
# Create a dictionary to map marks
Classprob = {}
# Get input from stdin
for line in sys.stdin:
#Remove spaces from beginning and end of the line
line = line.strip()
# parse the input from mapper.py
ClassA = line.split('\t', 1)
#from collections import Counter
#samples = [10,10,60,10,30]
counts = Counter(ClassA)
total = sum(counts.values())
probability_mass = {k:v/total for k,v in counts.items()}
probability_mass
probability_mass.get(4)
# Print each probability
for probability_mass in Classprob.keys():
print ('%s\t%s') % (probability_mass, Classprob[probability_mass])
#print(str(probability_mass))
真正的错误应该在YARN UI中可用,但是
[print ('%s\t%s') % (ClassA)
需要两个值来格式化
如果没有用于分组值的键,则可以使用
print('%s\t%s' % (None, ClassA)
此外,变量不应以大写字母开头,而请参考http://pyformat.info
您可以在没有Hadoop的情况下使用python mapper.py | sort -u | python reducer.py
测试代码>
加上,mrjob或pyspark是可以提供更多有用功能的高级语言