在python中批量处理非常大的文本文件。

问题描述 投票:1回答:1

我正试图将一个非常大的文本文件(约150千兆字节)批处理成几个较小的文本文件(约10千兆字节)。

我的一般过程将是。

# iterate over file one line at a time
# accumulate batch as string 
--> # given a certain count that correlates to the size of my current accumulated batch and when that size is met: (this is where I am unsure)
        # write to file

# accumulate size count

我有一个粗略的指标来计算 到批处理(当需要的批处理大小时),但我不太清楚应该如何计算向磁盘写入给定批处理的频率。例如,如果我的批次大小是10千兆字节,我假设我需要反复写入而不是将整个10千兆字节的批次保存在内存中。我显然不想写更多的东西,因为这可能会相当昂贵。

你们有什么粗略的计算方法或技巧,可以用来计算出什么时候向磁盘写入这样的任务,比如大小与内存什么的?

python bigdata
1个回答
1
投票

假设你的大文件是简单的非结构化文本,也就是说,这对JSON这样的结构化文本没有好处,这里有一个读取每一行的替代方法:读取输入文件的大块二进制片段,直到达到你的chunksize,然后读取几行,关闭当前的输出文件,继续下一个。

我用@tdelaney的代码与我的代码进行了逐行比较--那段代码花了250s将一个12GiB的输入文件分割成6x2GiB的块,而这段代码花了大约50s,所以可能快了五倍,而且看起来它在我的SSD上的IO绑定运行>200MiBs读写,而逐行则是运行40-50MiBs读写。

我把缓冲关闭了,因为没有太多意义。咬合的大小和缓冲的设置可能是可以调整的,以提高性能,还没有试过其他的设置,因为对我来说反正好像是IO绑定的。

import time

outfile_template = "outfile-{}.txt"
infile_name = "large.text"
chunksize = 2_000_000_000
MEB = 2**20   # mebibyte
bitesize = 4_000_000 # the size of the reads (and writes) working up to chunksize

count = 0

starttime = time.perf_counter()

infile = open(infile_name, "rb", buffering=0)
outfile = open(outfile_template.format(count), "wb", buffering=0)

while True:
    byteswritten = 0
    while byteswritten < chunksize:
        bite = infile.read(bitesize)
        # check for EOF
        if not bite:
            break
        outfile.write(bite)
        byteswritten += len(bite)
    # check for EOF
    if not bite:
        break
    for i in range(2):
        l = infile.readline()
        # check for EOF
        if not l:
            break
        outfile.write(l)
    # check for EOF
    if not l:
        break
    outfile.close()
    count += 1
    print( count )
    outfile = open(outfile_template.format(count), "wb", buffering=0)

outfile.close()
infile.close()

endtime = time.perf_counter()

elapsed = endtime-starttime

print( f"Elapsed= {elapsed}" )

注意我还没有详尽地测试这不会丢失数据,虽然没有证据表明它确实会丢失任何东西,你应该自己验证。

也许可以增加一些健壮性,在一个分块结束时检查一下还有多少数据可读,这样你就不会出现最后输出文件长度为0(或比bitesize短)的情况了。

HTHbarny


1
投票

我在解析250GB的json时使用了稍微修改过的版本,我选择了我需要多少个更小的文件。number_of_slices 然后我找到了切分文件的位置(我总是寻找行尾)。最后,我用 file.seekfile.read(chunk)

import os
import mmap


FULL_PATH_TO_FILE = 'full_path_to_a_big_file'
OUTPUT_PATH = 'full_path_to_a_output_dir' # where sliced files will be generated


def next_newline_finder(mmapf):
    def nl_find(mmapf):
        while 1:
            current = hex(mmapf.read_byte())
            if hex(ord('\n')) == current:  # or whatever line-end symbol
                return(mmapf.tell())
    return nl_find(mmapf)


# find positions where to slice a file
file_info = os.stat(FULL_PATH_TO_FILE)
file_size = file_info.st_size
positions_for_file_slice = [0]
number_of_slices = 15  # say u want slice the big file to 15 smaller files
size_per_slice = file_size//number_of_slices

with open(FULL_PATH_TO_FILE, "r+b") as f:
    mmapf = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
    slice_counter = 1
    while slice_counter < number_of_slices:
        pos = size_per_slice*slice_counter
        mmapf.seek(pos)
        newline_pos = next_newline_finder(mmapf)
        positions_for_file_slice.append(newline_pos)
        slice_counter += 1

# create ranges for found positions (from, to)
positions_for_file_slice = [(pos, positions_for_file_slice[i+1]) if i < (len(positions_for_file_slice)-1) else (
    positions_for_file_slice[i], file_size) for i, pos in enumerate(positions_for_file_slice)]


# do actual slice of a file
with open(FULL_PATH_TO_FILE, "rb") as f:
    for i, position_pair in enumerate(positions_for_file_slice):
        read_from, read_to = position_pair
        f.seek(read_from)
        chunk = f.read(read_to-read_from)
        with open(os.path.join(OUTPUT_PATH, f'dummyfile{i}.json'), 'wb') as chunk_file:
            chunk_file.write(chunk)

0
投票

下面是一个逐行写入的例子。其以二进制模式打开,以避免行解码步骤,这需要少量的时间,但会歪曲字符数。例如,utf-8编码可能会在磁盘上为一个python字符使用多个字节。

4 Meg是对缓冲区的猜测。这个想法是为了让操作系统一次读取更多的文件,减少查找时间。这是否有效或最好使用的数字是值得商榷的--而且不同的操作系统会有不同的结果。我发现4兆是有区别的......但那是几年前的事了,事情会有变化。

outfile_template = "outfile-{}.txt"
infile_name = "infile.txt"
chunksize = 10_000_000_000
MEB = 2**20   # mebibyte

count = 0
byteswritten = 0
infile = open(infile_name, "rb", buffering=4*MEB)
outfile = open(outfile_template.format(count), "wb", buffering=4*MEB)

try:
    for line in infile:
        if byteswritten > chunksize:
            outfile.close()
            byteswritten = 0
            count += 1
            outfile = open(outfile_template.format(count), "wb", buffering=4*MEB)
        outfile.write(line)
        byteswritten += len(line)
finally:
    infile.close()
    outfile.close()
© www.soinside.com 2019 - 2024. All rights reserved.