我有一个具有100万行和3gb数据大小的csv文件。我使用了熊猫read_csv
将其转换为DataFrame,并且效果很好。
接下来,我必须格式化数据列,并根据某些列的值追加另一列。为此,我正在使用Dask DataFrame npartitions,然后逐行应用。我们的实例中有7.5gb的RAM,但是它挂起并通过MemoryError终止进程。
这是我用来格式化数据列的代码:
import pandas as pd
import json
import dask.dataframe as dd
import multiprocessing
def formatting_data(data):
print("cleaning and formatting data")
data["IsBadJson"] = False
data["BadJsonStr"] = None
data = dd.from_pandas(data, npartitions=4*multiprocessing.cpu_count())
.map_partitions(lambda df: df.apply(lambda row: parse_data_to_json(row), axis=1))
.compute(scheduler='processes')
return data
下面是我们用于格式化的函数parse_data_to_json
的代码
def parse_data_to_json(x):
try:
if x.get("RequestSent") == "nan":
x["RequestSent"] = None
x["IsBadJson"] = True
x["BadJsonStr"] = str(x.get("RequestSent"))
else:
x["RequestSent"] = json.loads(x.get("RequestSent"))
x["IsBadJson"] = False
x["BadJsonStr"] = None
except Exception as error:
print("Found an error value in Tax Json field RequestSent: {}, error details: {}".format(x.get("RequestSent"), error))
print("{}-{}-{}".format(None, True, str(x.get("RequestSent"))))
x["RequestSent"] = None
x["IsBadJson"] = True
x["BadJsonStr"] = str(x.get("RequestSent"))
try:
if x.get("ResponseReceived") == "nan":
x["ResponseReceived"] = None
x["IsBadJson"] = True
x["BadJsonStr"] = str(x.get("ResponseReceived"))
else:
x["ResponseReceived"] = json.loads(x.get("ResponseReceived"))
x["IsBadJson"] = False
x["BadJsonStr"] = None
except Exception as error:
print("Found an error value in Tax Json field RequestSent: {}, error details: {}".format(x.get("ResponseReceived"), error))
print("{}-{}-{}".format(None, True, str(x.get("ResponseReceived"))))
x["ResponseReceived"] = None
x["IsBadJson"] = True
x["BadJsonStr"] = str(x.get("ResponseReceived"))
return x
我建议允许Dask直接从CSV加载数据,而不是将其传递给Pandas数据框。参见https://docs.dask.org/en/latest/best-practices.html#load-data-with-dask