我正在尝试使用dask聚合具有多行不良数据的大型(66gb)数据库。
由于dask没有消除不良行功能,因此我首先将所有数据作为pandas数据框读取,并清除不良行。然后,我将其转换为dask数据框。我的代码如下:
将dask.dataframe导入为dd将熊猫作为pd导入从dask.distributed import客户端
#Groups the average Thresholds by NEATGeneration and finds the mean, standard deviation, minimum and maximum of the data
def group(df):
res = df.groupby(df["NEATGeneration"]).agg({'averageThreshold': ['mean', 'std','max','min']}).compute()
return res
if __name__ == '__main__':
Client(n_workers=4, threads_per_worker=6,memory_limit='120GB')
#Loads in the data as a pandas datframe inlcuding bad lines
df = dd.read_csv("agentsvfitness.txt",error_bad_lines=False,usecols=["NEATGeneration","averageThreshold"])
#Replaces elements in the averageThreshold column that are not numeric with NA
pd.to_numeric(df['averageThreshold'] , errors ='coerce')
#Removes rows with NA
df = df.dropna()
#runs the group() function in parallel
df = group(df)
#Sets all column names and prepares data for writing to csv
df.columns = ['mean', 'std','max','min']
#Writes aggregated data to a single csv file
df.to_csv("averageThreshold.csv")
我遇到的问题是当以下列方式错误记录数据(强制):
NEATGeneration,averageFitness,averageResourcesConsumed,averageThreshold
0,8.32,0.8533333333333334,0.4819999999999999
0,8.486666666666666,1.7266666666666666.47333333333333333 #lacking“,0”
0,8.0533333333333331.8466666666666666,0.4500000000000001#缺少“,“
0,8.306666666666667,1.9466666666666668,0.44933131583851454
当将数据读入dask数据帧时,我当前的方法无法删除这些行。有什么方法可以从现有数据框中删除这些不良行吗?还有其他方法只能读取“好”数据(数据点数正确的数据)吗?我正在具有24个CPU和120GB内存的群集上运行。
我的猜测是,read_csv
关键字的某种组合可以解决您的问题,但是我对它们不太熟悉,所以我将建议一种替代方法。
您可以使用Dask Bag将文本行读取为文本而不是Pandas Dataframe。然后,您可以使用Python函数(可能是通过计算逗号或其他数字)来过滤掉坏行,然后可以将其写回到文本文件中,然后使用Dask Dataframe重新读取,因为数据要多一些清理。也许还有一些不错的方法可以将Dask Bag变成Dask Dataframe,而无需将中间文件写入磁盘,但这可能稍微复杂一些。