高效地将大量大整数从Python存储到文件中

问题描述 投票:0回答:1

我有很多大整数,例如生成使用:

import random

low = 10 ** 10
high = 10 ** 100

numbers = sorted([random.randint(low, high) for _ in range(10000000)])

可以假设它们在保存之前总是会被排序。我的目标是以最节省空间的方式将它们保存到文件中以便长期存储。我尝试过几种方法。

  1. 纯文本和
    .tar.gz
    给了我迄今为止最好的压缩:

首先,保存使用

with open("output.dat", 'w') as f:
    f.write('\n'.join([str(num) for num in numbers]))

结果为963MB,然后使用

进行压缩
tar -czf output.tar.gz output.dat

最后给了我446MB。

  1. 保存到

    pickle
    的速度很快(这是次要的,但不如空间效率那么重要),并且产生的结果大小与
    .tar.gz
    相当(但更差):

    进口泡菜

    with open("output.pickle", 'wb') as f: pickle.dump(数字, f)

给了我 460MB。

  1. 我尝试使用
    h5
    h5py
    压缩转换为字节数组并存储为
    gzip
    ,但这既慢又不像其他两种方法那样节省空间。

首先我需要将数字转换为字节,然后像这样展平数组

import numpy as np
def flatten(xss):
    return [x for xs in xss for x in xs]
numbers_bytes = [list(bytes(str(num).encode())) for num in numbers]
sections = [len(bl) for bl in numbers_bytes]
output_data = flatten(numbers_bytes)
indices_data = np.cumsum(sections)

然后通过

保存
import h5py

with h5py.File("output.h5", 'w') as f:
    f.create_dataset(
        "data",
        data=output_data,
        compression="gzip",
        compression_opts=9,
    )
    f.create_dataset(
        "indices",
        data=indices_data,
        compression="gzip",
        compression_opts=9,
    )

给了我 670MB。我认为一定有更好的方法将数字存储在

h5
中。 用户@RobbyCornelissen在this答案中建议了另一种方法,但是我不确定如何在Python中实现它。

python storage pickle tar h5py
1个回答
0
投票

我用不同的方法进行了一些测试,得到了这样的结果

最终结果:

  • 高效原创 - 时间:46.23 秒,大小:380.19 MB
  • 高级 - 时间:15.60 秒,大小:387.75 MB
  • Pickle - 时间:1.63 秒,大小:419.43 MB
  • Tar GZ - 时间:77.51 秒,大小:446.14 MB
  • 纯文本 - 时间:8.65 秒,大小:971.69 MB

高效原创方法: 我实现了一种使用增量编码与可变长度数量 (VLQ) 编码相结合的压缩方法。以下是简要概述:

  • 我们计算连续数字之间的差异。
  • 这些差异使用 VLQ 进行编码,对于较小的数字使用较少的字节。
  • 然后使用 gzip 压缩编码数据。

这是核心编码函数的简化版本:

def encode_numbers_original(numbers):
    encoded = bytearray()
    prev = 0
    for num in numbers:
        delta = num - prev
        encoded.extend(encode_varint(delta))
        prev = num
    return encoded

def save_numbers_original(numbers, filename):
    encoded = encode_numbers_original(numbers)
    with gzip.open(filename, 'wb', compresslevel=9) as f:
        f.write(encoded)

进阶方法: 对于高级方法,我使用了更复杂的方法:

  • 我们仍然使用 delta 编码来计算数字之间的差异。
  • 我们使用自定义位长编码方案,而不是 VLQ。
  • 我们存储每个数字的位长度,后跟数字本身,仅使用必要的字节。
  • 编码数据使用 zlib 进行压缩,以便更好地控制压缩。

这是核心编码函数的简化版本:

def compress_numbers(numbers):
    compressed = bytearray()
    prev = 0
    for num in numbers:
        diff = num - prev
        length = diff.bit_length()
        compressed.extend(struct.pack('>H', length))
        compressed.extend(diff.to_bytes(math.ceil(length / 8), byteorder='big'))
        prev = num
    return zlib.compress(compressed, level=9)

def save_compressed_numbers(numbers, filename):
    compressed = compress_numbers(numbers)
    with open(filename, 'wb') as f:
        f.write(compressed)

这是完整的脚本,您可以在其中测试不同的方法:

import random
import time
import os
import pickle
import gzip
import tarfile
import threading
import struct
import zlib
import math

# Functions for the new advanced method
def bit_length(n):
    return n.bit_length()

def encode_number(n):
    length = bit_length(n)
    return struct.pack('>H', length) + n.to_bytes(math.ceil(length / 8), byteorder='big')

def decode_number(data):
    length = struct.unpack('>H', data[:2])[0]
    return int.from_bytes(data[2:2+math.ceil(length / 8)], byteorder='big'), 2 + math.ceil(length / 8)

def compress_numbers(numbers):
    compressed = bytearray()
    prev = 0
    for num in numbers:
        diff = num - prev
        compressed.extend(encode_number(diff))
        prev = num
    return zlib.compress(compressed, level=9)

def decompress_numbers(compressed_data):
    decompressed = zlib.decompress(compressed_data)
    numbers = []
    i = 0
    prev = 0
    while i < len(decompressed):
        diff, bytes_read = decode_number(decompressed[i:])
        num = prev + diff
        numbers.append(num)
        prev = num
        i += bytes_read
    return numbers

def save_compressed_numbers(numbers, filename):
    compressed = compress_numbers(numbers)
    with open(filename, 'wb') as f:
        f.write(compressed)

def load_compressed_numbers(filename):
    with open(filename, 'rb') as f:
        compressed = f.read()
    return decompress_numbers(compressed)

# Functions for the original efficient method
def encode_varint(number):
    """Encode an integer as a variable-length quantity (VLQ)."""
    encoded = []
    while True:
        encoded.append(number & 0x7f)
        number >>= 7
        if not number:
            break
    encoded[-1] |= 0x80
    return bytes(encoded)

def decode_varint(stream):
    """Decode a VLQ to an integer."""
    number = 0
    for i, byte in enumerate(stream):
        number |= (byte & 0x7f) << (7 * i)
        if byte & 0x80:
            break
    return number

def encode_numbers_original(numbers):
    """Encode a list of sorted numbers using delta encoding and VLQ."""
    encoded = bytearray()
    prev = 0
    for num in numbers:
        delta = num - prev
        encoded.extend(encode_varint(delta))
        prev = num
    return encoded

def decode_numbers_original(encoded_data):
    """Decode numbers from encoded data."""
    numbers = []
    prev = 0
    stream = iter(encoded_data)
    try:
        while True:
            delta = decode_varint(stream)
            num = prev + delta
            numbers.append(num)
            prev = num
    except StopIteration:
        pass
    return numbers

def save_numbers_original(numbers, filename):
    """Save encoded numbers to a gzip-compressed file."""
    encoded = encode_numbers_original(numbers)
    with gzip.open(filename, 'wb', compresslevel=9) as f:
        f.write(encoded)

def load_numbers_original(filename):
    """Load and decode numbers from a gzip-compressed file."""
    with gzip.open(filename, 'rb') as f:
        encoded_data = f.read()
    return decode_numbers_original(encoded_data)

# General functions for time measurement and file size
def measure_time(func, timeout=300):
    """Measure execution time of a function with a timeout."""
    result = [None]
    exception = [None]
    execution_time = [None]
    
    def worker():
        try:
            start_time = time.time()
            result[0] = func()
            execution_time[0] = time.time() - start_time
        except Exception as e:
            exception[0] = e
    
    thread = threading.Thread(target=worker)
    thread.start()
    thread.join(timeout)
    
    if thread.is_alive():
        print("Operation was interrupted due to timeout")
        return None, timeout
    
    if exception[0] is not None:
        raise exception[0]
    
    return result[0], execution_time[0]

def get_file_size(filename):
    """Get file size in MB."""
    return os.path.getsize(filename) / (1024 * 1024)

# Compression methods
def method_plain_text(numbers, filename):
    """Save numbers as plain text."""
    def save():
        with open(filename, 'w') as f:
            f.write('\n'.join(map(str, numbers)))
    
    _, save_time = measure_time(save)
    file_size = get_file_size(filename)
    
    return save_time, file_size

def method_tar_gz(numbers, filename):
    """Save numbers using tar.gz compression."""
    def save():
        with open("temp.dat", 'w') as f:
            f.write('\n'.join(map(str, numbers)))
        with tarfile.open(filename, "w:gz") as tar:
            tar.add("temp.dat", arcname="output.dat")
        os.remove("temp.dat")
    
    _, save_time = measure_time(save)
    file_size = get_file_size(filename)
    
    return save_time, file_size

def method_pickle(numbers, filename):
    """Save numbers using pickle."""
    def save():
        with open(filename, 'wb') as f:
            pickle.dump(numbers, f)
    
    _, save_time = measure_time(save)
    file_size = get_file_size(filename)
    
    return save_time, file_size

def method_efficient_original(numbers, filename):
    """Save numbers using the original efficient method."""
    def save():
        save_numbers_original(numbers, filename)
    
    _, save_time = measure_time(save)
    file_size = get_file_size(filename)
    
    return save_time, file_size

def method_advanced(numbers, filename):
    """Save numbers using the advanced method."""
    def save():
        save_compressed_numbers(numbers, filename)
    
    _, save_time = measure_time(save)
    file_size = get_file_size(filename)
    
    return save_time, file_size

# Main comparison function
def run_comparison(n, low, high):
    print(f"Generating {n} numbers in range [{low}, {high}]...")
    numbers = sorted([random.randint(low, high) for _ in range(n)])
    
    methods = [
        ("Advanced", method_advanced, "output_advanced.bin"),
        ("Efficient Original", method_efficient_original, "output_efficient_original.gz"),
        ("Pickle", method_pickle, "output.pickle"),
        ("Plain Text", method_plain_text, "output.dat"),
        ("Tar GZ", method_tar_gz, "output.tar.gz")
    ]
    
    results = []
    
    for name, method, filename in methods:
        print(f"\nTesting method: {name}")
        save_time, file_size = method(numbers, filename)
        if save_time is not None:
            results.append((name, save_time, file_size))
            print(f"Save time: {save_time:.2f} sec")
            print(f"File size: {file_size:.2f} MB")
        else:
            print(f"Method {name} did not complete within the time limit")
    
    print("\nFinal Results:")
    for name, save_time, file_size in sorted(results, key=lambda x: x[2]):
        print(f"{name:20} - Time: {save_time:6.2f} sec, Size: {file_size:8.2f} MB")

if __name__ == "__main__":
    n = 10000000  # 10 million numbers
    low = 10 ** 10
    high = 10 ** 100
    run_comparison(n, low, high)
© www.soinside.com 2019 - 2024. All rights reserved.