我有很多大整数,例如生成使用:
import random
low = 10 ** 10
high = 10 ** 100
numbers = sorted([random.randint(low, high) for _ in range(10000000)])
可以假设它们在保存之前总是会被排序。我的目标是以最节省空间的方式将它们保存到文件中以便长期存储。我尝试过几种方法。
.tar.gz
给了我迄今为止最好的压缩:首先,保存使用
with open("output.dat", 'w') as f:
f.write('\n'.join([str(num) for num in numbers]))
结果为963MB,然后使用
进行压缩tar -czf output.tar.gz output.dat
最后给了我446MB。
保存到
pickle
的速度很快(这是次要的,但不如空间效率那么重要),并且产生的结果大小与.tar.gz
相当(但更差):
进口泡菜
with open("output.pickle", 'wb') as f: pickle.dump(数字, f)
给了我 460MB。
h5
和 h5py
压缩转换为字节数组并存储为 gzip
,但这既慢又不像其他两种方法那样节省空间。首先我需要将数字转换为字节,然后像这样展平数组
import numpy as np
def flatten(xss):
return [x for xs in xss for x in xs]
numbers_bytes = [list(bytes(str(num).encode())) for num in numbers]
sections = [len(bl) for bl in numbers_bytes]
output_data = flatten(numbers_bytes)
indices_data = np.cumsum(sections)
然后通过
保存import h5py
with h5py.File("output.h5", 'w') as f:
f.create_dataset(
"data",
data=output_data,
compression="gzip",
compression_opts=9,
)
f.create_dataset(
"indices",
data=indices_data,
compression="gzip",
compression_opts=9,
)
给了我 670MB。我认为一定有更好的方法将数字存储在
h5
中。
用户@RobbyCornelissen在this答案中建议了另一种方法,但是我不确定如何在Python中实现它。
我用不同的方法进行了一些测试,得到了这样的结果
最终结果:
高效原创方法: 我实现了一种使用增量编码与可变长度数量 (VLQ) 编码相结合的压缩方法。以下是简要概述:
这是核心编码函数的简化版本:
def encode_numbers_original(numbers):
encoded = bytearray()
prev = 0
for num in numbers:
delta = num - prev
encoded.extend(encode_varint(delta))
prev = num
return encoded
def save_numbers_original(numbers, filename):
encoded = encode_numbers_original(numbers)
with gzip.open(filename, 'wb', compresslevel=9) as f:
f.write(encoded)
进阶方法: 对于高级方法,我使用了更复杂的方法:
这是核心编码函数的简化版本:
def compress_numbers(numbers):
compressed = bytearray()
prev = 0
for num in numbers:
diff = num - prev
length = diff.bit_length()
compressed.extend(struct.pack('>H', length))
compressed.extend(diff.to_bytes(math.ceil(length / 8), byteorder='big'))
prev = num
return zlib.compress(compressed, level=9)
def save_compressed_numbers(numbers, filename):
compressed = compress_numbers(numbers)
with open(filename, 'wb') as f:
f.write(compressed)
这是完整的脚本,您可以在其中测试不同的方法:
import random
import time
import os
import pickle
import gzip
import tarfile
import threading
import struct
import zlib
import math
# Functions for the new advanced method
def bit_length(n):
return n.bit_length()
def encode_number(n):
length = bit_length(n)
return struct.pack('>H', length) + n.to_bytes(math.ceil(length / 8), byteorder='big')
def decode_number(data):
length = struct.unpack('>H', data[:2])[0]
return int.from_bytes(data[2:2+math.ceil(length / 8)], byteorder='big'), 2 + math.ceil(length / 8)
def compress_numbers(numbers):
compressed = bytearray()
prev = 0
for num in numbers:
diff = num - prev
compressed.extend(encode_number(diff))
prev = num
return zlib.compress(compressed, level=9)
def decompress_numbers(compressed_data):
decompressed = zlib.decompress(compressed_data)
numbers = []
i = 0
prev = 0
while i < len(decompressed):
diff, bytes_read = decode_number(decompressed[i:])
num = prev + diff
numbers.append(num)
prev = num
i += bytes_read
return numbers
def save_compressed_numbers(numbers, filename):
compressed = compress_numbers(numbers)
with open(filename, 'wb') as f:
f.write(compressed)
def load_compressed_numbers(filename):
with open(filename, 'rb') as f:
compressed = f.read()
return decompress_numbers(compressed)
# Functions for the original efficient method
def encode_varint(number):
"""Encode an integer as a variable-length quantity (VLQ)."""
encoded = []
while True:
encoded.append(number & 0x7f)
number >>= 7
if not number:
break
encoded[-1] |= 0x80
return bytes(encoded)
def decode_varint(stream):
"""Decode a VLQ to an integer."""
number = 0
for i, byte in enumerate(stream):
number |= (byte & 0x7f) << (7 * i)
if byte & 0x80:
break
return number
def encode_numbers_original(numbers):
"""Encode a list of sorted numbers using delta encoding and VLQ."""
encoded = bytearray()
prev = 0
for num in numbers:
delta = num - prev
encoded.extend(encode_varint(delta))
prev = num
return encoded
def decode_numbers_original(encoded_data):
"""Decode numbers from encoded data."""
numbers = []
prev = 0
stream = iter(encoded_data)
try:
while True:
delta = decode_varint(stream)
num = prev + delta
numbers.append(num)
prev = num
except StopIteration:
pass
return numbers
def save_numbers_original(numbers, filename):
"""Save encoded numbers to a gzip-compressed file."""
encoded = encode_numbers_original(numbers)
with gzip.open(filename, 'wb', compresslevel=9) as f:
f.write(encoded)
def load_numbers_original(filename):
"""Load and decode numbers from a gzip-compressed file."""
with gzip.open(filename, 'rb') as f:
encoded_data = f.read()
return decode_numbers_original(encoded_data)
# General functions for time measurement and file size
def measure_time(func, timeout=300):
"""Measure execution time of a function with a timeout."""
result = [None]
exception = [None]
execution_time = [None]
def worker():
try:
start_time = time.time()
result[0] = func()
execution_time[0] = time.time() - start_time
except Exception as e:
exception[0] = e
thread = threading.Thread(target=worker)
thread.start()
thread.join(timeout)
if thread.is_alive():
print("Operation was interrupted due to timeout")
return None, timeout
if exception[0] is not None:
raise exception[0]
return result[0], execution_time[0]
def get_file_size(filename):
"""Get file size in MB."""
return os.path.getsize(filename) / (1024 * 1024)
# Compression methods
def method_plain_text(numbers, filename):
"""Save numbers as plain text."""
def save():
with open(filename, 'w') as f:
f.write('\n'.join(map(str, numbers)))
_, save_time = measure_time(save)
file_size = get_file_size(filename)
return save_time, file_size
def method_tar_gz(numbers, filename):
"""Save numbers using tar.gz compression."""
def save():
with open("temp.dat", 'w') as f:
f.write('\n'.join(map(str, numbers)))
with tarfile.open(filename, "w:gz") as tar:
tar.add("temp.dat", arcname="output.dat")
os.remove("temp.dat")
_, save_time = measure_time(save)
file_size = get_file_size(filename)
return save_time, file_size
def method_pickle(numbers, filename):
"""Save numbers using pickle."""
def save():
with open(filename, 'wb') as f:
pickle.dump(numbers, f)
_, save_time = measure_time(save)
file_size = get_file_size(filename)
return save_time, file_size
def method_efficient_original(numbers, filename):
"""Save numbers using the original efficient method."""
def save():
save_numbers_original(numbers, filename)
_, save_time = measure_time(save)
file_size = get_file_size(filename)
return save_time, file_size
def method_advanced(numbers, filename):
"""Save numbers using the advanced method."""
def save():
save_compressed_numbers(numbers, filename)
_, save_time = measure_time(save)
file_size = get_file_size(filename)
return save_time, file_size
# Main comparison function
def run_comparison(n, low, high):
print(f"Generating {n} numbers in range [{low}, {high}]...")
numbers = sorted([random.randint(low, high) for _ in range(n)])
methods = [
("Advanced", method_advanced, "output_advanced.bin"),
("Efficient Original", method_efficient_original, "output_efficient_original.gz"),
("Pickle", method_pickle, "output.pickle"),
("Plain Text", method_plain_text, "output.dat"),
("Tar GZ", method_tar_gz, "output.tar.gz")
]
results = []
for name, method, filename in methods:
print(f"\nTesting method: {name}")
save_time, file_size = method(numbers, filename)
if save_time is not None:
results.append((name, save_time, file_size))
print(f"Save time: {save_time:.2f} sec")
print(f"File size: {file_size:.2f} MB")
else:
print(f"Method {name} did not complete within the time limit")
print("\nFinal Results:")
for name, save_time, file_size in sorted(results, key=lambda x: x[2]):
print(f"{name:20} - Time: {save_time:6.2f} sec, Size: {file_size:8.2f} MB")
if __name__ == "__main__":
n = 10000000 # 10 million numbers
low = 10 ** 10
high = 10 ** 100
run_comparison(n, low, high)