我的目标是打乱一个太大而无法立即加载到我的设备 RAM 中的 csv 文件。我的计划是使用 csv.DictReader 遍历整个文件,并使用 csv.DictWriter 将每一行随机写入一组较小文件中的一个,然后打乱所有较小文件,然后再次连接它们。然而,我无法完成第一步,因为较小的文件无法用 pandas 打开,因为它会给出以下错误:
pandas.errors.EmptyDataError: No columns to parse from file
。
为了执行第一步,我创建了一个 csv DictWriter 文件列表,每个小文件对应一个我想要将主文件拆分成的小文件,以便将主文件的每一行随机分配给其中一个 DictWriter 文件。这是一个虚拟示例,说明了第一步的想法,但在综合生成的 csv 文件上,它给出了相同的错误:
import pandas as pd
import random
import numpy as np
import csv
import os
#First, creating a dummy file just containing integers 0-19 over 2 columns.
data=pd.DataFrame({'col1':list(range(10)),'col2':list(range(10,20))})
data.to_csv('test_file.csv',index=False)
n_chunks=2 #For this example I only split the dummy file into two smaller files
#Next, make a list of DictWriter objects, one for each smaller file
file_names=[f"test_batch_{batch_no}.csv" for batch_no in list(range(n_chunks))]
chunks=[csv.DictWriter(open(file_name,'w'),["col1","col2"]) for file_name in file_names]
#Make headers for each smaller file
for chunk in chunks:
chunk.writeheader()
#Now, randomly assign each line in test_file.csv to one of the smaller files.
with open("test_file.csv",newline='') as data:
reader=csv.DictReader(data)
for line in reader:
i=random.randint(0,n_chunks-1)
chunks[i].writerow(line)
for file_name in file_names:
#The next line gives the error.
chunk=pd.read_csv(file_name)
奇怪的是,较小的文件完全由直接从主文件复制的行组成,在本例中,主文件本身是作为 pandas 文件创建的,并且可以作为 pandas 数据帧加载,没有任何问题。此外,当我检查创建的两个较小文件(“test_batch_0.csv”和“test_batch_1.csv”)时,例如记事本,它们对我来说看起来像常规的 csv 文件,但仍然不知何故 pd.read_csv 无法读取它。
我尝试更改“打开”命令的换行符参数,因为我在之前的帖子中发现“没有要解析表单文件的列”有时是由不正确的换行符引起的,但无济于事。
为了完整起见,以下是完整的错误路径:
Traceback (most recent call last):
File "[...]/main.py", line 29, in <module>
chunk=pd.read_csv(file_name)
^^^^^^^^^^^^^^^^^^^^^^
File "[...]/.venv/lib/python3.12/site-packages/pandas/io/parsers/readers.py", line 1026, in read_csv
return _read(filepath_or_buffer, kwds)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "[...]/.venv/lib/python3.12/site-packages/pandas/io/parsers/readers.py", line 620, in _read
parser = TextFileReader(filepath_or_buffer, **kwds)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "[...]/.venv/lib/python3.12/site-packages/pandas/io/parsers/readers.py", line 1620, in __init__
self._engine = self._make_engine(f, self.engine)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "[...]/.venv/lib/python3.12/site-packages/pandas/io/parsers/readers.py", line 1898, in _make_engine
return mapping[engine](f, **self.options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "[...].venv/lib/python3.12/site-packages/pandas/io/parsers/c_parser_wrapper.py", line 93, in __init__
self._reader = parsers.TextReader(src, **kwds)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "parsers.pyx", line 581, in pandas._libs.parsers.TextReader.__cinit__
pandas.errors.EmptyDataError: No columns to parse from file
提前非常感谢!
要预处理较大的文件并将其随机分片为较小的文件,您不需要 Pandas(即使最终分析需要 Pandas)。您可以使用 DictReader 和 DictWriter:
import csv
import random
N = 10 # the number of sharded output CSVs
OUT_NAME = "output_{i:0>3}.csv" # pad the file's number with up to three zeroes
out_files = [open(OUT_NAME.format(i=i), "w", newline="") for i in range(N)]
writers = [
csv.DictWriter(
out_files[i],
fieldnames=["Col_1", "Col_2"],
extrasaction="ignore", # ignore columns other than Col_1 or Col_2, as read by DictReader
)
for i in range(N)
]
for w in writers:
w.writeheader()
reader = csv.DictReader(open("input.csv", newline=""))
for row in reader:
w = random.choice(writers)
w.writerow(row)
我创建了一个大小为 620MB(1_000_000 行 x 100 列)的示例输入 CSV,并在其上运行该程序,最终得到 10 个文件,每个文件大小约为 1.3MB(仅从 100 列中选取 2 个):
137K Nov 17 21:37 output_000.csv
138K Nov 17 21:37 output_001.csv
... ... ...
138K Nov 17 21:37 output_008.csv
134K Nov 17 21:37 output_009.csv
创建 10 个文件几乎没有使用任何超出 Python 运行时所需的内存(在我的机器上为 8MB)。分片成更多、更小的文件需要更多的内存;我假设因为每个文件都有一个缓冲区,所以在刷新到磁盘之前有更多的缓冲区保存在行上:
# 个输出文件 | 已用内存(MB) |
---|---|
10 | 8 |
100 | 15 |
1000 | 68 |
原始打开的文件未关闭,因此在读取之前更改不会刷新到文件中。 这是经过细微更改的固定代码:
import pandas as pd
import random
import csv
data = pd.DataFrame({'col1': list(range(10)), 'col2': list(range(10, 20))})
data.to_csv('test_file.csv', index=False)
n_chunks = 2
file_names = [f'test_batch_{batch_no}.csv' for batch_no in range(n_chunks)]
# Open files
files = [open(file_name, 'w') for file_name in file_names]
# use files
chunks = [csv.DictWriter(file, ['col1', 'col2']) for file in files]
for chunk in chunks:
chunk.writeheader()
with open('test_file.csv', newline='') as data:
reader = csv.DictReader(data)
for line in reader:
random.choice(chunks).writerow(line)
# close files
for file in files:
file.close()
for file_name in file_names:
df = pd.read_csv(file_name)
print(df)
输出示例:
col1 col2
0 0 10
1 1 11
2 2 12
3 6 16
col1 col2
0 3 13
1 4 14
2 5 15
3 7 17
4 8 18
5 9 19
好吧,只要将大型 csv 的每一行分配给带有标题的随机目标文件,并且只要您不打算打开“太多”文件,那么我会利用
contextlib.ExitStack()
和要写入的文件字典到.
我不确定什么是“太多”打开文件,但我确实使用过这样的代码写入十几个文件,可能更多。
一旦文件被填充并且可能在打乱它们的行之后,您就可以将它们重新加载到 pandas 中以执行您喜欢的任何其他操作。
import csv
import random
import contextlib
TARGET_FILE_COUNT = 10
writers = {}
with contextlib.ExitStack() as stack:
reader = csv.reader(stack.enter_context(open("in.csv", "r", encoding="utf-8")))
headers = next(reader)
for row in reader:
target_file_id = random.randrange(0, TARGET_FILE_COUNT)
if target_file_id not in writers:
writers[target_file_id] = csv.writer(stack.enter_context(open(f"out_{target_file_id}.csv", "w", encoding="utf-8", newline="")))
writers[target_file_id].writerow(headers)
writers[target_file_id].writerow(row)