将大型 CSV 随机分片为较小的 CSV

问题描述 投票:0回答:3

我的目标是打乱一个太大而无法立即加载到我的设备 RAM 中的 csv 文件。我的计划是使用 csv.DictReader 遍历整个文件,并使用 csv.DictWriter 将每一行随机写入一组较小文件中的一个,然后打乱所有较小文件,然后再次连接它们。然而,我无法完成第一步,因为较小的文件无法用 pandas 打开,因为它会给出以下错误:

pandas.errors.EmptyDataError: No columns to parse from file

为了执行第一步,我创建了一个 csv DictWriter 文件列表,每个小文件对应一个我想要将主文件拆分成的小文件,以便将主文件的每一行随机分配给其中一个 DictWriter 文件。这是一个虚拟示例,说明了第一步的想法,但在综合生成的 csv 文件上,它给出了相同的错误:

import pandas as pd
import random
import numpy as np
import csv
import os

#First, creating a dummy file just containing integers 0-19 over 2 columns.

data=pd.DataFrame({'col1':list(range(10)),'col2':list(range(10,20))})
data.to_csv('test_file.csv',index=False)

n_chunks=2 #For this example I only split the dummy file into two smaller files

#Next, make a list of DictWriter objects, one for each smaller file
file_names=[f"test_batch_{batch_no}.csv" for batch_no in list(range(n_chunks))]
chunks=[csv.DictWriter(open(file_name,'w'),["col1","col2"]) for file_name in file_names]

#Make headers for each smaller file
for chunk in chunks:
    chunk.writeheader()

#Now, randomly assign each line in test_file.csv to one of the smaller files.
with open("test_file.csv",newline='') as data:
    reader=csv.DictReader(data)
    for line in reader:
        i=random.randint(0,n_chunks-1)
        chunks[i].writerow(line)

for file_name in file_names:
#The next line gives the error.
    chunk=pd.read_csv(file_name)

奇怪的是,较小的文件完全由直接从主文件复制的行组成,在本例中,主文件本身是作为 pandas 文件创建的,并且可以作为 pandas 数据帧加载,没有任何问题。此外,当我检查创建的两个较小文件(“test_batch_0.csv”和“test_batch_1.csv”)时,例如记事本,它们对我来说看起来像常规的 csv 文件,但仍然不知何故 pd.read_csv 无法读取它。

我尝试更改“打开”命令的换行符参数,因为我在之前的帖子中发现“没有要解析表单文件的列”有时是由不正确的换行符引起的,但无济于事。

为了完整起见,以下是完整的错误路径:

Traceback (most recent call last):
  File "[...]/main.py", line 29, in <module>
    chunk=pd.read_csv(file_name)
          ^^^^^^^^^^^^^^^^^^^^^^
  File "[...]/.venv/lib/python3.12/site-packages/pandas/io/parsers/readers.py", line 1026, in read_csv
    return _read(filepath_or_buffer, kwds)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[...]/.venv/lib/python3.12/site-packages/pandas/io/parsers/readers.py", line 620, in _read
    parser = TextFileReader(filepath_or_buffer, **kwds)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[...]/.venv/lib/python3.12/site-packages/pandas/io/parsers/readers.py", line 1620, in __init__
    self._engine = self._make_engine(f, self.engine)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[...]/.venv/lib/python3.12/site-packages/pandas/io/parsers/readers.py", line 1898, in _make_engine
    return mapping[engine](f, **self.options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[...].venv/lib/python3.12/site-packages/pandas/io/parsers/c_parser_wrapper.py", line 93, in __init__
    self._reader = parsers.TextReader(src, **kwds)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "parsers.pyx", line 581, in pandas._libs.parsers.TextReader.__cinit__
pandas.errors.EmptyDataError: No columns to parse from file

提前非常感谢!

python pandas csv read-csv
3个回答
1
投票

要预处理较大的文件并将其随机分片为较小的文件,您不需要 Pandas(即使最终分析需要 Pandas)。您可以使用 DictReader 和 DictWriter:

import csv
import random

N = 10                           # the number of sharded output CSVs
OUT_NAME = "output_{i:0>3}.csv"  # pad the file's number with up to three zeroes

out_files = [open(OUT_NAME.format(i=i), "w", newline="") for i in range(N)]

writers = [
    csv.DictWriter(
        out_files[i],
        fieldnames=["Col_1", "Col_2"],
        extrasaction="ignore",  # ignore columns other than Col_1 or Col_2, as read by DictReader
    )
    for i in range(N)
]
for w in writers:
    w.writeheader()


reader = csv.DictReader(open("input.csv", newline=""))
for row in reader:
    w = random.choice(writers)
    w.writerow(row)

我创建了一个大小为 620MB(1_000_000 行 x 100 列)的示例输入 CSV,并在其上运行该程序,最终得到 10 个文件,每个文件大小约为 1.3MB(仅从 100 列中选取 2 个):

137K Nov 17 21:37 output_000.csv
138K Nov 17 21:37 output_001.csv
...  ...          ...
138K Nov 17 21:37 output_008.csv
134K Nov 17 21:37 output_009.csv

创建 10 个文件几乎没有使用任何超出 Python 运行时所需的内存(在我的机器上为 8MB)。分片成更多、更小的文件需要更多的内存;我假设因为每个文件都有一个缓冲区,所以在刷新到磁盘之前有更多的缓冲区保存在行上:

# 个输出文件 已用内存(MB)
10 8
100 15
1000 68

0
投票

原始打开的文件未关闭,因此在读取之前更改不会刷新到文件中。 这是经过细微更改的固定代码:

import pandas as pd
import random
import csv

data = pd.DataFrame({'col1': list(range(10)), 'col2': list(range(10, 20))})
data.to_csv('test_file.csv', index=False)

n_chunks = 2
file_names = [f'test_batch_{batch_no}.csv' for batch_no in range(n_chunks)]
# Open files
files = [open(file_name, 'w') for file_name in file_names]
# use files
chunks = [csv.DictWriter(file, ['col1', 'col2']) for file in files]

for chunk in chunks:
    chunk.writeheader()

with open('test_file.csv', newline='') as data:
    reader = csv.DictReader(data)
    for line in reader:
        random.choice(chunks).writerow(line)

# close files
for file in files:
    file.close()

for file_name in file_names:
    df = pd.read_csv(file_name)
    print(df)

输出示例:

   col1  col2
0     0    10
1     1    11
2     2    12
3     6    16
   col1  col2
0     3    13
1     4    14
2     5    15
3     7    17
4     8    18
5     9    19

0
投票

好吧,只要将大型 csv 的每一行分配给带有标题的随机目标文件,并且只要您不打算打开“太多”文件,那么我会利用

contextlib.ExitStack()
和要写入的文件字典到.

我不确定什么是“太多”打开文件,但我确实使用过这样的代码写入十几个文件,可能更多。

一旦文件被填充并且可能在打乱它们的行之后,您就可以将它们重新加载到 pandas 中以执行您喜欢的任何其他操作。

import csv
import random
import contextlib

TARGET_FILE_COUNT = 10
writers = {}
with contextlib.ExitStack() as stack:
    reader = csv.reader(stack.enter_context(open("in.csv", "r", encoding="utf-8")))
    headers = next(reader)
    for row in reader:
        target_file_id = random.randrange(0, TARGET_FILE_COUNT)

        if target_file_id not in writers:
            writers[target_file_id] = csv.writer(stack.enter_context(open(f"out_{target_file_id}.csv", "w", encoding="utf-8", newline="")))
            writers[target_file_id].writerow(headers)

        writers[target_file_id].writerow(row)
© www.soinside.com 2019 - 2024. All rights reserved.