我正在清理我的文本数据,然后想将其保存到 csv。定义的清理函数工作正常,但是当 to_csv() 部分出现时,问题也随之而来。 也许有人遇到过类似的问题,并且有一个技巧可以与我分享如何解决它?也许可以将数据以块的形式保存到 csv 中?
`if name == 'main':
# 初始化Dask客户端
客户端=客户端(n_workers=3,threads_per_worker=1,
memory_limit='1.5GB')
print('Dask 客户端已创建')
PATH = "C:\\Users\\el ruchenzo\\jobsproject\\jobsproject\\lt_data.csv"
reqd = ['description', 'title', 'code']
blocksize = 25e6 # REDUCED FROM 100 GB TO 25 GB
# Load the CSV with Dask
df = dd.read_csv(PATH,
usecols=reqd,
blocksize=blocksize,
dtype={'Code': 'float'},
engine='python',
encoding='utf-8',
on_bad_lines='skip')
# Apply the cleaning function to the 'title' column in the DataFrame
start_time = time.time()
df['cleaned_title'] = df['title'].map_partitions(lambda partition: partition.apply(wrapper_func), meta=('title', 'object'))
gc.collect()
end_time = time.time()
print(f"1. Processing time: {end_time - start_time:.2f} seconds")
# Apply the cleaning function to the 'description' column in the DataFrame
start_time = time.time()
df['cleaned_description'] = df['description'].map_partitions(lambda partition: partition.apply(wrapper_func), meta=('description', 'object'))
gc.collect()
end_time = time.time()
print(f"2. Processing time: {end_time - start_time:.2f} seconds")
df.to_csv('cleaned_lemma_*.csv', index=False, encoding='utf-8', single_file = False)
print('Saved to csv successfully')
print('Work ended successfully')`
尝试更改工人数量、块大小、内存限制等,但似乎没有任何效果。还将map()更改为apply(),尝试使用df = df.persist(),使用pandas分块而不是dask将数据写入csv,等等。
使用 np.array_split() 将数据帧拆分为多个 csv 文件,每个文件有 10k 条记录。
您可以更改每个文件所需的记录数。
num_parts = len(df) // 10000 + 1
split_data = np.array_split(df, num_parts)
for i, data_part in enumerate(split_data):
print(f"Writing part no:- {i+1} \n")
file_name = f'Part_{i+1}.csv'
data_part.to_csv(file_name, index=False)
print(f"Data Part no:-{i+1} has been Exported as csv successfully!!\n")