Python Pool.apply_async() 返回 None 类型对象

问题描述 投票:0回答:1

我有一个 4GB 以上的文件,其中每一行都代表一个非常嵌套的 JSON 字符串。该文件的示例如下:

{"core":{"id":"1","field1":{"subfield1":1, "subfield2":{"subsubfield1": 1}},....,"field700":700}}
{"core":{"id":"1","field1":{"subfield1":1, "subfield2":{"subsubfield1": 1}},....,"field700":700}}
100,000+ lines like above

我需要执行以下操作:

  • 将文件中的每一行转换为 JSON 对象
  • 展平每个 JSON 对象,以便所有键值对都处于同一级别,同时仅过滤我需要的几个键(每个 JSON 对象中总共 700 多个键,我需要大约 100 个键)

我的计划是将这 100K 多行分成多个块,并使用

multiprocessing
展平每个块中的 JSON 对象并将它们组合回数据帧。因为我是
multiprocessing
的新手,所以我读了一些帖子,包括 thisthis (以及更多)。我尝试根据第一篇文章编写代码,如下所示:

import json
import multiprocessing
from multiprocessing import Pool, Process, Queue

import pandas as pd


# list of ~100 columns I want to keep
COLS_TO_KEEP = {'id', 'field1', 'subfield1', 'subsubfield2', 'field8', ..., 'field680'}


def chunk_the_list(list_of_items, chunk_size):
    for i in range(0, len(list_of_items), chunk_size):
        yield list_of_items[i : i + chunk_size]


def do_work(list_of_dicts):
    results = []
    for d in list_of_dicts:
        results.append(flatten_dict(d))


def flatten_dict(d, parent_key="", sep="_"):
    # This function recursively dives into each JSON object and flattens them
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        else:
            if new_key in COLS_TO_KEEP:
                items.append((new_key, v))
    return dict(items)


def main():
    raw_data_file_path_name = 'path/to/file.txt' # file is 4GB+
    file = open(raw_data_file_path_name, encoding="utf-8").readlines()
    listify = [json.loads(line) for line in file]
    size_of_each_chunk = int(len(listify) / (multiprocessing.cpu_count() - 2))
    chunked_data = chunk_the_list(listify, size_of_each_chunk)

    p = Pool(processes=multiprocessing.cpu_count() - 2)
    results = [p.apply_async(do_work, args=(chunk,)) for chunk in chunked_data]

    results = [item.get() for item in results] # This list has None type objects when I check using debugger
    results = sum(results, []) # because the above line returns None values, this step errors out 

    # I want to create a Pandas dataframe using the accumulated JSON/dictionary objects from results and write it out as a parquet or csv file at the end like this https://stackoverflow.com/a/20638258/1330974
    df = pd.DataFrame(results)
    df.to_parquet('df.parquet.gzip', compression='gzip')

if __name__ == "__main__":
    start_time = time.time()
    main()

我是否做错了什么才能将

None
类型对象重新返回到结果集中?预先感谢您的建议和答案!

python multiprocessing python-multiprocessing python-multithreading
1个回答
0
投票

1.处理多处理结果

def do_work(list_of_dicts):
    results = []
    for d in list_of_dicts:
        results.append(flatten_dict(d))
    return results  # Add this line to return the results

2.扁平化 JSON 和过滤列

3.多重处理和连接结果:

import json
import multiprocessing
import pandas as pd

# list of ~100 columns I want to keep
COLS_TO_KEEP = {'id', 'field1', 'subfield1', 'subsubfield2', 'field8', ..., 'field680'}

def chunk_the_list(list_of_items, chunk_size):
    for i in range(0, len(list_of_items), chunk_size):
        yield list_of_items[i : i + chunk_size]

def flatten_dict(d, parent_key="", sep="_"):
    # This function recursively dives into each JSON object and flattens them
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        else:
            if new_key in COLS_TO_KEEP:
                items.append((new_key, v))
    return dict(items)

def do_work(list_of_dicts):
    results = []
    for d in list_of_dicts:
        results.append(flatten_dict(d))
    return results

def main():
    raw_data_file_path_name = 'path/to/file.txt'  # file is 4GB+
    with open(raw_data_file_path_name, encoding="utf-8") as file:
        listify = [json.loads(line) for line in file]

    size_of_each_chunk = int(len(listify) / (multiprocessing.cpu_count() - 2))
    chunked_data = chunk_the_list(listify, size_of_each_chunk)

    p = multiprocessing.Pool(processes=multiprocessing.cpu_count() - 2)
    results = p.map(do_work, chunked_data)  # Use map for simplicity

    # Flatten the results list of lists into a single list
    flattened_results = [item for sublist in results for item in sublist]

    # Create Pandas DataFrame and save as Parquet file
    df = pd.DataFrame(flattened_results)
    df.to_parquet('df.parquet.gzip', compression='gzip')

if __name__ == "__main__":
    main()
© www.soinside.com 2019 - 2024. All rights reserved.