我写的Hadoop流减速机(python3),它不能正常工作,例如用于以下输入:
数据= '狗\ T1 \ T1 \ mitahara \ T1 \ T1 \ mitahara \ T0 \ T1 \ mitahara \ T0 \ T1 \涂料\ T0 \ T1 \涂料\ T0 \ T1 \涂料\ T1 \ T1 \ N'
import re
import sys
# initialize trackers
current_word = None
spam_count, ham_count = 0,0
# read from standard input
# Substitute read from a file
for line in data.splitlines():
#for line in sys.stdin:
# parse input
word, is_spam, count = line.split('\t')
count = int(count)
if word == current_word:
if is_spam == '1':
spam_count += count
else:
ham_count += count
else:
if current_word:
# word to emit...
if spam_count:
print("%s\t%s\t%s" % (current_word, '1', spam_count))
print("%s\t%s\t%s" % (current_word, '0', ham_count))
if is_spam == '1':
current_word, spam_count = word, count
else:
current_word, ham_count = word, count
if current_word == word:
if is_spam == '1':
print(f'{current_word}\t{is_spam}\t{spam_count}')
else:
print(f'{current_word}\t{is_spam}\t{spam_count}')
我有 :
#dog 1 2
#dog 0 2
#cat 1 3
2“垃圾邮件”狗OK,以及两个“火腿” -dogs。猫不这样做well.It应该是:
#dog 1 2
#dog 0 2
#cat 0 2
#cat 1 1
原因是:你应该废止qazxsw POI,不仅更新qazxsw POI,反之亦然。
重写
ham_count
作为
spam_count
尽管如此,ouptout不会完全按照你的输出
1)因为你总是打印if is_spam == '1':
current_word, spam_count = word, count
else:
current_word, ham_count = word, count
第一(但在该例子的结果,“猫火腿”早发)
2)输出块仅发出垃圾邮件或只取决于if is_spam == '1':
current_word, spam_count = word, count
ham_count = 0
else:
current_word, ham_count = word, count
spam_count = 0
变量的当前状态,火腿,但我猜,你打算发出,所有的,对不对?
spam_count
- 有“猫垃圾邮件”的正确的计数,但没有“猫火腿” - 我想,你应该至少打印是这样的:
重写这个代码
is_spam
作为
The output:
dog 1 2
dog 0 2
cat 1 1
- 和完整输出将是
if current_word == word:
if is_spam == '1':
print(f'{current_word}\t{is_spam}\t{spam_count}')
else:
print(f'{current_word}\t{is_spam}\t{spam_count}')
Itertools
此外,print(f'{current_word}\t{1}\t{spam_count}')
print(f'{current_word}\t{0}\t{ham_count}')
模块非常适合类似的任务:
dog 1 2
dog 0 2
cat 1 1
cat 0 2
itertools是itertools.goupby对象,它的发电机 - 所以,要小心,这是懒惰和返回值只有一次(所以,我在这里显示输出一样的例子,因为它消耗发生器值)
import itertools
splitted_lines = map(lambda x: x.split('\t'), data.splitlines())
grouped = itertools.groupby(splitted_lines, lambda x: x[0])
好了,现在每个组可以再次被它的分组grouped
geature:
[(gr_name, list(gr)) for gr_name, gr in grouped]
Out:
[('dog',
[['dog', '1', '1'],
['dog', '1', '1'],
['dog', '0', '1'],
['dog', '0', '1']]),
('cat', [['cat', '0', '1'], ['cat', '0', '1'], ['cat', '1', '1']])]
通过itertools完整的例子:
is_spam
- import itertools
def sum_group(group):
"""
>>> sum_group([('1', [['dog', '1', '1'], ['dog', '1', '1']]), ('0', [['dog', '0', '1'], ['dog', '0', '1']])])
[('1', 2), ('0', 2)]
"""
return sum([int(i[-1]) for i in group])
splitted_lines = map(lambda x: x.split('\t'), data.splitlines())
grouped = itertools.groupby(splitted_lines, lambda x: x[0])
[(name, [(tag_name, sum_group(sub_group))
for tag_name, sub_group
in itertools.groupby(group, lambda x: x[1])])
for name, group in grouped]
Out:
[('dog', [('1', 2), ('0', 2)]), ('cat', [('0', 2), ('1', 1)])]
包含具有相同数据的元组的列表。由于这是import itertools
def emit_group(name, tag_name, group):
tag_sum = sum([int(i[-1]) for i in group])
print(f"{name}\t{tag_name}\t{tag_sum}") # emit here
return (name, tag_name, tag_sum) # return the same data
splitted_lines = map(lambda x: x.split('\t'), data.splitlines())
grouped = itertools.groupby(splitted_lines, lambda x: x[0])
emitted = [[emit_group(name, tag_name, sub_group)
for tag_name, sub_group
in itertools.groupby(group, lambda x: x[1])]
for name, group in grouped]
Out:
dog 1 2
dog 0 2
cat 0 2
cat 1 1
方法,它与流完美的作品; emitted
好iterools教程,如果你有兴趣。