使用多线程/进程解析非常大的文件

问题描述 投票:0回答:1

我正在创建一个 Blazor WebAssembly 应用程序,它基本上读取大约 500,000 行的文件,将每行或每组行分解为一个对象并进行相应的更改),然后将该数组 (MemoryStream) 输出到文件。

目前我循环遍历数组并构建一个长字符串,然后使用

MemoryStream fileMemoryStream = new MemoryStream(encoding.GetByteCount(longString));

如何以最佳方式循环遍历 500,000 行,解析它们并通过多个进程/线程构建此流,并确保最终流的顺序相同(所有行都包含时间戳并且必须按时间顺序排列)?我所拥有的可以工作,但是速度非常慢。

multithreading parsing
1个回答
0
投票

要在 Blazor WebAssembly 应用程序中高效处理 500,000 行,同时保持正确的时间顺序,您可以使用多线程或并行策略来优化您的方法。以下是如何以更好的性能处理此问题,同时确保输出保持有序:

关键概念:

  1. 并行处理:使用多个线程同时处理文件的部分内容。
  2. 有序合并:确保每个线程并行处理行,但按原始顺序输出结果(基于时间戳的时间顺序)。

优化策略:

  1. 对输入数据进行分块

    • 将 500,000 行分割成更小的块。
    • 每个块可以并行处理以减少总体时间。
  2. 使用并行处理

    • 利用
      Task
      Parallel.ForEach
      在多个线程之间分配工作负载。
    • 确保并行处理每个块,而不会阻塞其他线程。
  3. 订单保存:

    • 维护数据结构(如
      ConcurrentQueue
      )来存储结果,同时保留顺序。
    • 并行处理所有块后,将块按原始顺序合并回去。
  4. 避免大字符串连接

    • 大循环中的字符串连接效率可能很低。相反,在组合线条时使用
      StringBuilder
      可以获得更好的性能。
  5. 内存流构建:

    • 不要构建长字符串并将其转换为
      MemoryStream
      ,而是在处理这些行时直接写入
      MemoryStream
      。这可以避免不必要的内存开销。

示例代码(具有并行处理功能的 C#):

using System.IO;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Concurrent;

public async Task<MemoryStream> ProcessLargeFileAsync(string[] lines)
{
    int chunkSize = 5000; // Adjust this based on your environment
    int totalLines = lines.Length;

    // Concurrent queue to store processed lines while preserving order
    ConcurrentDictionary<int, string> processedChunks = new ConcurrentDictionary<int, string>();

    // Split the input lines into chunks and process each chunk in parallel
    Parallel.ForEach(Enumerable.Range(0, (int)Math.Ceiling((double)totalLines / chunkSize)), chunkIndex =>
    {
        int startLine = chunkIndex * chunkSize;
        int endLine = Math.Min(startLine + chunkSize, totalLines);

        StringBuilder chunkBuilder = new StringBuilder();

        for (int i = startLine; i < endLine; i++)
        {
            // Process the line (e.g., parse into an object, make changes)
            string processedLine = ProcessLine(lines[i]); // Your processing logic here

            // Append the processed line to the chunk builder
            chunkBuilder.AppendLine(processedLine);
        }

        // Add the processed chunk to the dictionary, ensuring the index order is preserved
        processedChunks.TryAdd(chunkIndex, chunkBuilder.ToString());
    });

    // Combine all processed chunks in the correct order
    StringBuilder finalResult = new StringBuilder();
    for (int i = 0; i < processedChunks.Count; i++)
    {
        finalResult.Append(processedChunks[i]);
    }

    // Create a MemoryStream from the final result
    MemoryStream fileMemoryStream = new MemoryStream(Encoding.UTF8.GetBytes(finalResult.ToString()));

    return fileMemoryStream;
}

private string ProcessLine(string line)
{
    // Parse and modify the line as needed
    return line; // Placeholder for actual processing logic
}

说明:

  1. 块大小:文件被分成可管理的5000行块(根据内存和性能调整)。
  2. 并行处理:并行处理每个块,根据需要修改每一行。
  3. ConcurrentDictionary:确保每个块根据其索引以正确的顺序存储,保留最终输出的顺序。
  4. StringBuilder:用于高效构建最终结果,无需昂贵的字符串连接。

其他提示:

  • 调整块大小:根据可用内存和CPU,调整块大小以平衡内存使用和处理速度。
  • I/O 优化:如果读取或写入文件,请确保使用异步 I/O 操作以避免阻塞线程。

此方法利用并行处理,最大限度地减少字符串连接开销,并确保最终的

MemoryStream
根据时间戳正确排序。

© www.soinside.com 2019 - 2024. All rights reserved.