将此权重/分数读数从输入.csv文件转换为列名列表,并使用Python Apache Beam
按其降序权重/分数矩阵格式排序并写入另一个.csv文件
Input .csv file
user_id, cat_1, cat_2, cat_3, cat_4, cat_5, cat_6
1 , 0.10, 0.2, 0.20, 0.12, 0.7, 0.6
2 , 0.6, 0.20, 0.12, 0.15, 0.13, 0.11
3 , 0.11, 0.10, 0.8, 0.12, 0.3, 0.7
Desired output .csv file
user_id, top_3_categories
1, [('cat_3', '0.20'), ('cat_2', '0.2'), ('cat_1', '0.10')]
2, [('cat_1', '0.6'), ('cat_2', '0.20'), ('cat_3', '0.12')]
3, [('cat_3', '0.8'), ('cat_1', '0.11'), ('cat_2', '0.10')]
以下步骤使用pandas
生成所需的输出:
with beam.Pipeline() as p:
lines = p | "ReadCsv" >> ReadFromText(file_pattern="input_csv",skip_header_lines=1)
def process_csv(line):
import pandas as pd
line = line.split(',')
df = pd.DataFrame(data=[line],columns=['user_id', 'cat_1', 'cat_2', 'cat_3', 'cat_4', 'cat_5', 'cat_6']).set_index('user_id')
df['top_3_categories'] = df.apply(lambda x: x.sort_values(ascending=False).iloc[:3].to_dict(OrderedDict), axis=1)
df = df['top_3_categories'].apply(lambda x: str([(k,v) for k,v in x.iteritems()])).reset_index()
return ",".join(list(df.iloc[0].values))
lines = lines | "Process Data" >> beam.Map(fn=process_csv)
lines | "Write csv" >> WriteToText(file_path_prefix="output.csv")