我有一个 4GB 以上的文件,其中每一行都代表一个非常嵌套的 JSON 字符串。该文件的示例如下:
{"core":{"id":"1","field1":{"subfield1":1, "subfield2":{"subsubfield1": 1}},....,"field700":700}}
{"core":{"id":"1","field1":{"subfield1":1, "subfield2":{"subsubfield1": 1}},....,"field700":700}}
100,000+ lines like above
我需要执行以下操作:
我的计划是将这 100K 多行分成多个块,并使用
multiprocessing
展平每个块中的 JSON 对象并将它们组合回数据帧。因为我是 multiprocessing
的新手,所以我读了一些帖子,包括 this 和 this (以及更多)。我尝试根据第一篇文章编写代码,如下所示:
import json
import multiprocessing
from multiprocessing import Pool, Process, Queue
import pandas as pd
# list of ~100 columns I want to keep
COLS_TO_KEEP = {'id', 'field1', 'subfield1', 'subsubfield2', 'field8', ..., 'field680'}
def chunk_the_list(list_of_items, chunk_size):
for i in range(0, len(list_of_items), chunk_size):
yield list_of_items[i : i + chunk_size]
def do_work(list_of_dicts):
results = []
for d in list_of_dicts:
results.append(flatten_dict(d))
def flatten_dict(d, parent_key="", sep="_"):
# This function recursively dives into each JSON object and flattens them
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
if new_key in COLS_TO_KEEP:
items.append((new_key, v))
return dict(items)
def main():
raw_data_file_path_name = 'path/to/file.txt' # file is 4GB+
file = open(raw_data_file_path_name, encoding="utf-8").readlines()
listify = [json.loads(line) for line in file]
size_of_each_chunk = int(len(listify) / (multiprocessing.cpu_count() - 2))
chunked_data = chunk_the_list(listify, size_of_each_chunk)
p = Pool(processes=multiprocessing.cpu_count() - 2)
results = [p.apply_async(do_work, args=(chunk,)) for chunk in chunked_data]
results = [item.get() for item in results] # This list has None type objects when I check using debugger
results = sum(results, []) # because the above line returns None values, this step errors out
# I want to create a Pandas dataframe using the accumulated JSON/dictionary objects from results and write it out as a parquet or csv file at the end like this https://stackoverflow.com/a/20638258/1330974
df = pd.DataFrame(results)
df.to_parquet('df.parquet.gzip', compression='gzip')
if __name__ == "__main__":
start_time = time.time()
main()
我是否做错了什么才能将
None
类型对象重新返回到结果集中?预先感谢您的建议和答案!
1.处理多处理结果
def do_work(list_of_dicts):
results = []
for d in list_of_dicts:
results.append(flatten_dict(d))
return results # Add this line to return the results
2.扁平化 JSON 和过滤列
3.多重处理和连接结果:
import json
import multiprocessing
import pandas as pd
# list of ~100 columns I want to keep
COLS_TO_KEEP = {'id', 'field1', 'subfield1', 'subsubfield2', 'field8', ..., 'field680'}
def chunk_the_list(list_of_items, chunk_size):
for i in range(0, len(list_of_items), chunk_size):
yield list_of_items[i : i + chunk_size]
def flatten_dict(d, parent_key="", sep="_"):
# This function recursively dives into each JSON object and flattens them
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
if new_key in COLS_TO_KEEP:
items.append((new_key, v))
return dict(items)
def do_work(list_of_dicts):
results = []
for d in list_of_dicts:
results.append(flatten_dict(d))
return results
def main():
raw_data_file_path_name = 'path/to/file.txt' # file is 4GB+
with open(raw_data_file_path_name, encoding="utf-8") as file:
listify = [json.loads(line) for line in file]
size_of_each_chunk = int(len(listify) / (multiprocessing.cpu_count() - 2))
chunked_data = chunk_the_list(listify, size_of_each_chunk)
p = multiprocessing.Pool(processes=multiprocessing.cpu_count() - 2)
results = p.map(do_work, chunked_data) # Use map for simplicity
# Flatten the results list of lists into a single list
flattened_results = [item for sublist in results for item in sublist]
# Create Pandas DataFrame and save as Parquet file
df = pd.DataFrame(flattened_results)
df.to_parquet('df.parquet.gzip', compression='gzip')
if __name__ == "__main__":
main()