我正试图将一个非常大的文本文件(约150千兆字节)批处理成几个较小的文本文件(约10千兆字节)。
我的一般过程将是。
# iterate over file one line at a time
# accumulate batch as string
--> # given a certain count that correlates to the size of my current accumulated batch and when that size is met: (this is where I am unsure)
# write to file
# accumulate size count
我有一个粗略的指标来计算 当 到批处理(当需要的批处理大小时),但我不太清楚应该如何计算向磁盘写入给定批处理的频率。例如,如果我的批次大小是10千兆字节,我假设我需要反复写入而不是将整个10千兆字节的批次保存在内存中。我显然不想写更多的东西,因为这可能会相当昂贵。
你们有什么粗略的计算方法或技巧,可以用来计算出什么时候向磁盘写入这样的任务,比如大小与内存什么的?
假设你的大文件是简单的非结构化文本,也就是说,这对JSON这样的结构化文本没有好处,这里有一个读取每一行的替代方法:读取输入文件的大块二进制片段,直到达到你的chunksize,然后读取几行,关闭当前的输出文件,继续下一个。
我用@tdelaney的代码与我的代码进行了逐行比较--那段代码花了250s将一个12GiB的输入文件分割成6x2GiB的块,而这段代码花了大约50s,所以可能快了五倍,而且看起来它在我的SSD上的IO绑定运行>200MiBs读写,而逐行则是运行40-50MiBs读写。
我把缓冲关闭了,因为没有太多意义。咬合的大小和缓冲的设置可能是可以调整的,以提高性能,还没有试过其他的设置,因为对我来说反正好像是IO绑定的。
import time
outfile_template = "outfile-{}.txt"
infile_name = "large.text"
chunksize = 2_000_000_000
MEB = 2**20 # mebibyte
bitesize = 4_000_000 # the size of the reads (and writes) working up to chunksize
count = 0
starttime = time.perf_counter()
infile = open(infile_name, "rb", buffering=0)
outfile = open(outfile_template.format(count), "wb", buffering=0)
while True:
byteswritten = 0
while byteswritten < chunksize:
bite = infile.read(bitesize)
# check for EOF
if not bite:
break
outfile.write(bite)
byteswritten += len(bite)
# check for EOF
if not bite:
break
for i in range(2):
l = infile.readline()
# check for EOF
if not l:
break
outfile.write(l)
# check for EOF
if not l:
break
outfile.close()
count += 1
print( count )
outfile = open(outfile_template.format(count), "wb", buffering=0)
outfile.close()
infile.close()
endtime = time.perf_counter()
elapsed = endtime-starttime
print( f"Elapsed= {elapsed}" )
注意我还没有详尽地测试这不会丢失数据,虽然没有证据表明它确实会丢失任何东西,你应该自己验证。
也许可以增加一些健壮性,在一个分块结束时检查一下还有多少数据可读,这样你就不会出现最后输出文件长度为0(或比bitesize短)的情况了。
HTHbarny
我在解析250GB的json时使用了稍微修改过的版本,我选择了我需要多少个更小的文件。number_of_slices
然后我找到了切分文件的位置(我总是寻找行尾)。最后,我用 file.seek
和 file.read(chunk)
import os
import mmap
FULL_PATH_TO_FILE = 'full_path_to_a_big_file'
OUTPUT_PATH = 'full_path_to_a_output_dir' # where sliced files will be generated
def next_newline_finder(mmapf):
def nl_find(mmapf):
while 1:
current = hex(mmapf.read_byte())
if hex(ord('\n')) == current: # or whatever line-end symbol
return(mmapf.tell())
return nl_find(mmapf)
# find positions where to slice a file
file_info = os.stat(FULL_PATH_TO_FILE)
file_size = file_info.st_size
positions_for_file_slice = [0]
number_of_slices = 15 # say u want slice the big file to 15 smaller files
size_per_slice = file_size//number_of_slices
with open(FULL_PATH_TO_FILE, "r+b") as f:
mmapf = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
slice_counter = 1
while slice_counter < number_of_slices:
pos = size_per_slice*slice_counter
mmapf.seek(pos)
newline_pos = next_newline_finder(mmapf)
positions_for_file_slice.append(newline_pos)
slice_counter += 1
# create ranges for found positions (from, to)
positions_for_file_slice = [(pos, positions_for_file_slice[i+1]) if i < (len(positions_for_file_slice)-1) else (
positions_for_file_slice[i], file_size) for i, pos in enumerate(positions_for_file_slice)]
# do actual slice of a file
with open(FULL_PATH_TO_FILE, "rb") as f:
for i, position_pair in enumerate(positions_for_file_slice):
read_from, read_to = position_pair
f.seek(read_from)
chunk = f.read(read_to-read_from)
with open(os.path.join(OUTPUT_PATH, f'dummyfile{i}.json'), 'wb') as chunk_file:
chunk_file.write(chunk)
下面是一个逐行写入的例子。其以二进制模式打开,以避免行解码步骤,这需要少量的时间,但会歪曲字符数。例如,utf-8编码可能会在磁盘上为一个python字符使用多个字节。
4 Meg是对缓冲区的猜测。这个想法是为了让操作系统一次读取更多的文件,减少查找时间。这是否有效或最好使用的数字是值得商榷的--而且不同的操作系统会有不同的结果。我发现4兆是有区别的......但那是几年前的事了,事情会有变化。
outfile_template = "outfile-{}.txt"
infile_name = "infile.txt"
chunksize = 10_000_000_000
MEB = 2**20 # mebibyte
count = 0
byteswritten = 0
infile = open(infile_name, "rb", buffering=4*MEB)
outfile = open(outfile_template.format(count), "wb", buffering=4*MEB)
try:
for line in infile:
if byteswritten > chunksize:
outfile.close()
byteswritten = 0
count += 1
outfile = open(outfile_template.format(count), "wb", buffering=4*MEB)
outfile.write(line)
byteswritten += len(line)
finally:
infile.close()
outfile.close()