通过多处理或Python中的Dask减少json.loads的内存使用量

问题描述 投票:0回答:1

我有一个具有100万行和3gb数据大小的csv文件。我使用了熊猫read_csv将其转换为DataFrame,并且效果很好。

接下来,我必须格式化数据列,并根据某些列的值追加另一列。为此,我正在使用Dask DataFrame npartitions,然后逐行应用。我们的实例中有7.5gb的RAM,但是它挂起并通过MemoryError终止进程。

这是我用来格式化数据列的代码:

import pandas as pd
import json
import dask.dataframe as dd
import multiprocessing

def formatting_data(data):

    print("cleaning and formatting data")

    data["IsBadJson"] = False

    data["BadJsonStr"] = None

    data = dd.from_pandas(data, npartitions=4*multiprocessing.cpu_count())
             .map_partitions(lambda df: df.apply(lambda row: parse_data_to_json(row), axis=1))
             .compute(scheduler='processes')

    return data

下面是我们用于格式化的函数parse_data_to_json的代码

def parse_data_to_json(x):

    try:

        if x.get("RequestSent") == "nan":

            x["RequestSent"] = None
            x["IsBadJson"] = True
            x["BadJsonStr"] = str(x.get("RequestSent"))

        else:

            x["RequestSent"] = json.loads(x.get("RequestSent"))
            x["IsBadJson"] = False
            x["BadJsonStr"] = None

    except Exception as error:
        print("Found an error value in Tax Json field RequestSent: {}, error details: {}".format(x.get("RequestSent"), error))

        print("{}-{}-{}".format(None, True, str(x.get("RequestSent"))))

        x["RequestSent"] = None
        x["IsBadJson"] = True
        x["BadJsonStr"] = str(x.get("RequestSent"))


    try:

        if x.get("ResponseReceived") == "nan":

            x["ResponseReceived"] = None
            x["IsBadJson"] = True
            x["BadJsonStr"] = str(x.get("ResponseReceived"))

        else:

            x["ResponseReceived"] = json.loads(x.get("ResponseReceived"))
            x["IsBadJson"] = False
            x["BadJsonStr"] = None

    except Exception as error:
        print("Found an error value in Tax Json field RequestSent: {}, error details: {}".format(x.get("ResponseReceived"), error))

        print("{}-{}-{}".format(None, True, str(x.get("ResponseReceived"))))

        x["ResponseReceived"] = None
        x["IsBadJson"] = True
        x["BadJsonStr"] = str(x.get("ResponseReceived"))

    return x
python pandas multiprocessing out-of-memory dask
1个回答
0
投票

我建议允许Dask直接从CSV加载数据,而不是将其传递给Pandas数据框。参见https://docs.dask.org/en/latest/best-practices.html#load-data-with-dask

© www.soinside.com 2019 - 2024. All rights reserved.