我继承了一些执行以下步骤的代码:
private static readonly RecyclableMemoryStreamManager MemoryStreamManager = new RecyclableMemoryStreamManager();
static async Task Main(string[] args)
{
using (FileStream fileStream = new FileStream("C:\\data.txt", FileMode.Open, FileAccess.Read))
{
var dataList = await DecompressData(fileStream);
dataList.Add(new MyObject { });
using(var stream = MemoryStreamManager.GetStream())
{
await JsonSerializer.SerializeAsync(stream, dataList);
stream.Position = 0;
var b = await CompressData(stream);
}
}
Console.WriteLine("All done");
}
private static async Task<List<SipTraceRecord>> DecompressData(Stream data)
{
using (var resultStream = MemoryStreamManager.GetStream())
{
GZipStream gzip = new GZipStream(data, CompressionMode.Decompress);
List<SipTraceRecord> recordsList = await JsonSerializer.DeserializeAsync<List<MyObject>>(gzip);
return recordsList;
}
}
private static async Task<byte[]> CompressData(Stream data)
{
byte[] compressedData;
using (var ms = MemoryStreamManager.GetStream())
{
using (GZipStream gzip = new GZipStream(ms, CompressionMode.Compress))
{
data.CopyTo(gzip);
compressedData = ms.GetBuffer();
}
}
return compressedData;
}
这与代码开始的地方有很大不同,我只是想尽可能地对其进行内存优化。测试输入文件大小为 600Kb,解压后为 22Mb,之前使用了 100Mb 的内存。现在内存已降至 90Mb。高内存使用率的区域仍然存在,例如
await JsonSerializer.SerializeAsync(stream, dataList);
使用 10 Mb 将数据写入流。是否可以像另一个方向一样进行优化,即没有字节数组,只是根据需要进行流传输?
data.CopyTo(gzip);
也在复制数据,但数据在此时被压缩,因此仅使用< 1Mb
JsonSerializer.DeserializeAsyncEnumerable()
以块的形式流式传输输入数据,动态解压缩它,然后动态压缩到包含 JSON 开头的某个输出流,而不是反序列化和重新序列化列表。大批。 然后,您可以流式传输并压缩新值,将它们添加到 JSON 数组中。
执行此操作的方法如下所示:
public static class JsonExtensions
{
public static readonly RecyclableMemoryStreamManager MemoryStreamManager = new RecyclableMemoryStreamManager();
// 2x buffer sized as recommended by Bradley Grainger, https://faithlife.codes/blog/2012/06/always-wrap-gzipstream-with-bufferedstream/
// But anything smaller than 85,000 bytes should be OK, since objects larger than that go on the large object heap. See:
// https://learn.microsoft.com/en-us/dotnet/standard/garbage-collection/large-object-heap
const int BufferSize = 16384;
// Compressed copy + serialize
public static async Task<byte []> CopyAndAddToCompressedByteArrayAsync<TItem>(byte [] input, IEnumerable<TItem> newItems, JsonSerializerOptions? options = default)
{
using var inputStream = new MemoryStream(input);
using var outputStream = MemoryStreamManager.GetStream();
await CopyAndAddToCompressedStreamAsync(inputStream, outputStream, newItems, options);
return outputStream.ToArray();
}
public static async Task CopyAndAddToCompressedFileAsync<TItem>(string inputPath, string outputPath, IEnumerable<TItem> newItems, JsonSerializerOptions? options = default)
{
await using var input = File.OpenRead(inputPath);
await using var output = File.OpenWrite(outputPath);
await CopyAndAddToCompressedStreamAsync(input, output, newItems, options);
}
public static async Task CopyAndAddToCompressedStreamAsync<TItem>(Stream input, Stream output, IEnumerable<TItem> newItems, JsonSerializerOptions? options = default)
{
options ??= JsonSerializerOptions.Default;
await using var inputDecompressor = new GZipStream(input, CompressionMode.Decompress, leaveOpen : true);
await using var outputCompressor = new GZipStream(output, CompressionMode.Compress, leaveOpen : true);
await using var outputBuffer = new BufferedStream(outputCompressor, BufferSize);
await using var writer = new Utf8JsonWriter(outputBuffer, new() { Indented = options.WriteIndented, Encoder = options.Encoder });
writer.WriteStartArray();
await foreach (var item in JsonSerializer.DeserializeAsyncEnumerable<TItem>(inputDecompressor, options))
{
JsonSerializer.Serialize(writer, item, options);
}
foreach (var item in newItems)
{
JsonSerializer.Serialize(writer, item, options);
}
writer.WriteEndArray();
}
// Compressed serialize (initial creation)
public static async Task<byte []> SerializeToCompressedByteArrayAsync<TValue>(TValue value, JsonSerializerOptions? options = default)
{
using (var output = JsonExtensions.MemoryStreamManager.GetStream())
{
await JsonExtensions.SerializeToCompressedStreamAsync(output, value, options);
return output.ToArray();
}
}
public static async Task SerializeToCompressedFileAsync<TValue>(string path, TValue value, JsonSerializerOptions? options = default)
{
await using var output = File.OpenWrite(path);
await SerializeToCompressedStreamAsync(output, value, options);
}
public static async Task SerializeToCompressedStreamAsync<TValue>(Stream utf8Json, TValue value, JsonSerializerOptions? options = default)
{
await using var outputCompressor = new GZipStream(utf8Json, CompressionMode.Compress, leaveOpen : true);
await using var outputBuffer = new BufferedStream(outputCompressor, BufferSize);
await JsonSerializer.SerializeAsync<TValue>(outputBuffer, value, options);
}
}
现在,如果您完全使用文件,则可以按如下方式创建初始 JSON 文件:
List<SipTraceRecord> initialList = /*Initialize this however you do currently */ ;
var options = new JsonSerializerOptions
{
// Add whatever you need here
};
await JsonExtensions.SerializeToCompressedFileAsync(filePath, initialList, options);
要追加到文件中,您可以执行以下操作:
List<SipTraceRecord> addList = /*Initialize this however you do currently */;
var tempPath = Path.GetTempFileName();
await JsonExtensions.CopyAndAddToCompressedFileAsync(filePath, tempPath, addList, options);
File.Move(tempPath, filePath, true);
或者,如果您确实需要使用字节数组来存储压缩数据,您可以按如下方式创建初始数组:
var options = new JsonSerializerOptions
{
// Add whatever you need here
};
var initialBytes = await JsonExtensions.SerializeToCompressedByteArrayAsync(initialList, options);
并创建一个串联数组,如下所示:
var appendedBytes = await JsonExtensions.CopyAndAddToCompressedByteArrayAsync(initialBytes, list, options);
备注:
即使您使用
DeserializeAsyncEnumerable()
流过 MemoryStream
,您也必须异步执行此操作,因为没有易于使用的 API 可以与 System.Text.Json 同步流过 JSON 数组。
DeserializeAsyncEnumerable()
将尝试从流中读取大小等于 JsonSerializerOptions.DefaultBufferSize
的字节块(默认值为 16,384 字节),反序列化该块中的所有数组项,然后一次性生成它们。 这可以防止流过巨大数组时内存无限增长。
演示在这里。