如何有效地反序列化压缩的对象列表,添加到其中,然后再次压缩而不使用太多内存

问题描述 投票:0回答:1

我继承了一些执行以下步骤的代码:

  1. 从压缩数据的字节数组开始,流式传输并解压缩它
  2. 将其反序列化为对象列表
  3. 添加到列表
  4. 序列化列表
  5. 将数据重新压缩回字节数组
private static readonly RecyclableMemoryStreamManager MemoryStreamManager = new RecyclableMemoryStreamManager();

static async Task Main(string[] args)
{
    using (FileStream fileStream = new FileStream("C:\\data.txt", FileMode.Open, FileAccess.Read))
    {
        var dataList = await DecompressData(fileStream);
        dataList.Add(new MyObject { });
        using(var stream = MemoryStreamManager.GetStream())
        {
            await JsonSerializer.SerializeAsync(stream, dataList);
            stream.Position = 0;
            var b = await CompressData(stream);
        }
    }

    Console.WriteLine("All done");
}

private static async Task<List<SipTraceRecord>> DecompressData(Stream data)
{
    using (var resultStream = MemoryStreamManager.GetStream())
    {
        GZipStream gzip = new GZipStream(data, CompressionMode.Decompress);

        List<SipTraceRecord> recordsList = await JsonSerializer.DeserializeAsync<List<MyObject>>(gzip);
        return recordsList;
    }
}

private static async Task<byte[]> CompressData(Stream data)
{
    byte[] compressedData;
    using (var ms = MemoryStreamManager.GetStream())
    {
        using (GZipStream gzip = new GZipStream(ms, CompressionMode.Compress))
        {
            data.CopyTo(gzip);
            compressedData = ms.GetBuffer();
        }

    }

    return compressedData;
}

这与代码开始的地方有很大不同,我只是想尽可能地对其进行内存优化。测试输入文件大小为 600Kb,解压后为 22Mb,之前使用了 100Mb 的内存。现在内存已降至 90Mb。高内存使用率的区域仍然存在,例如

await JsonSerializer.SerializeAsync(stream, dataList);
使用 10 Mb 将数据写入流。是否可以像另一个方向一样进行优化,即没有字节数组,只是根据需要进行流传输?

data.CopyTo(gzip);
也在复制数据,但数据在此时被压缩,因此仅使用< 1Mb

c# .net optimization memory-efficient
1个回答
0
投票

您可以使用

JsonSerializer.DeserializeAsyncEnumerable()
以块的形式流式传输输入数据,动态解压缩它,然后动态压缩到包含 JSON 开头的某个输出流,而不是反序列化和重新序列化列表。大批。 然后,您可以流式传输并压缩新值,将它们添加到 JSON 数组中。

执行此操作的方法如下所示:

public static class JsonExtensions
{
    public static readonly RecyclableMemoryStreamManager MemoryStreamManager = new RecyclableMemoryStreamManager();
    
    // 2x buffer sized as recommended by Bradley Grainger, https://faithlife.codes/blog/2012/06/always-wrap-gzipstream-with-bufferedstream/
    // But anything smaller than 85,000 bytes should be OK, since objects larger than that go on the large object heap.  See:
    // https://learn.microsoft.com/en-us/dotnet/standard/garbage-collection/large-object-heap
    const int BufferSize = 16384;       

    // Compressed copy + serialize
    public static async Task<byte []> CopyAndAddToCompressedByteArrayAsync<TItem>(byte [] input, IEnumerable<TItem> newItems, JsonSerializerOptions? options = default)
    {
        using var inputStream = new MemoryStream(input);
        using var outputStream = MemoryStreamManager.GetStream();
        await CopyAndAddToCompressedStreamAsync(inputStream, outputStream, newItems, options);
        return outputStream.ToArray();
    }
    
    public static async Task CopyAndAddToCompressedFileAsync<TItem>(string inputPath, string outputPath, IEnumerable<TItem> newItems, JsonSerializerOptions? options = default)
    {
        await using var input = File.OpenRead(inputPath);
        await using var output = File.OpenWrite(outputPath);
        await CopyAndAddToCompressedStreamAsync(input, output, newItems, options);
    }
    
    public static async Task CopyAndAddToCompressedStreamAsync<TItem>(Stream input, Stream output, IEnumerable<TItem> newItems, JsonSerializerOptions? options = default)
    {
        options ??= JsonSerializerOptions.Default;
        
        await using var inputDecompressor = new GZipStream(input, CompressionMode.Decompress, leaveOpen : true);
        await using var outputCompressor = new GZipStream(output, CompressionMode.Compress, leaveOpen : true);
        await using var outputBuffer = new BufferedStream(outputCompressor, BufferSize);
        await using var writer = new Utf8JsonWriter(outputBuffer, new() { Indented = options.WriteIndented, Encoder = options.Encoder });
        writer.WriteStartArray();
        await foreach (var item in JsonSerializer.DeserializeAsyncEnumerable<TItem>(inputDecompressor, options))
        {
            JsonSerializer.Serialize(writer, item, options);
        }
        foreach (var item in newItems)
        {
            JsonSerializer.Serialize(writer, item, options);
        }
        writer.WriteEndArray();
    }

    // Compressed serialize (initial creation)
    public static async Task<byte []> SerializeToCompressedByteArrayAsync<TValue>(TValue value, JsonSerializerOptions? options = default)
    {
        using (var output = JsonExtensions.MemoryStreamManager.GetStream())
        {
            await JsonExtensions.SerializeToCompressedStreamAsync(output, value, options);
            return output.ToArray();
        }
    }

    public static async Task SerializeToCompressedFileAsync<TValue>(string path, TValue value, JsonSerializerOptions? options = default)
    {
        await using var output = File.OpenWrite(path);
        await SerializeToCompressedStreamAsync(output, value, options);
    }
    
    public static async Task SerializeToCompressedStreamAsync<TValue>(Stream utf8Json, TValue value, JsonSerializerOptions? options = default)
    {
        await using var outputCompressor = new GZipStream(utf8Json, CompressionMode.Compress, leaveOpen : true);
        await using var outputBuffer = new BufferedStream(outputCompressor, BufferSize);
        await JsonSerializer.SerializeAsync<TValue>(outputBuffer, value, options);
    }
}

现在,如果您完全使用文件,则可以按如下方式创建初始 JSON 文件:

List<SipTraceRecord> initialList = /*Initialize this however you do currently */ ;

var options = new JsonSerializerOptions
{
    // Add whatever you need here
};
await JsonExtensions.SerializeToCompressedFileAsync(filePath, initialList, options);

要追加到文件中,您可以执行以下操作:

List<SipTraceRecord> addList = /*Initialize this however you do currently */;

var tempPath = Path.GetTempFileName();
await JsonExtensions.CopyAndAddToCompressedFileAsync(filePath, tempPath, addList, options);
File.Move(tempPath, filePath, true);

或者,如果您确实需要使用字节数组来存储压缩数据,您可以按如下方式创建初始数组:

var options = new JsonSerializerOptions
{
    // Add whatever you need here
};
var initialBytes = await JsonExtensions.SerializeToCompressedByteArrayAsync(initialList, options);

并创建一个串联数组,如下所示:

var appendedBytes = await JsonExtensions.CopyAndAddToCompressedByteArrayAsync(initialBytes, list, options);

备注:

  • 即使您使用

    DeserializeAsyncEnumerable()
    流过
    MemoryStream
    ,您也必须异步执行此操作,因为没有易于使用的 API 可以与 System.Text.Json 同步流过 JSON 数组。

  • DeserializeAsyncEnumerable()
    将尝试从流中读取大小等于
    JsonSerializerOptions.DefaultBufferSize
    的字节块(默认值为 16,384 字节),反序列化该块中的所有数组项,然后一次性生成它们。 这可以防止流过巨大数组时内存无限增长。

演示在这里

© www.soinside.com 2019 - 2024. All rights reserved.