如何在.NET 8中并发处理大型文本文件?

问题描述 投票:0回答:1

我有一个文本文件,其中有数十万行,每行都有固定的编号。由竖线 (|) 分隔的属性。 我已按如下方式处理每一行。

  1. 使用
    StreamReader.ReadLineAsync()
    从文件中读取一行。
  2. 拆分元素并将它们填充到 DTO 中。
  3. 使用 DTO 进行一些处理。
  4. 如果通过,则为每行/DTO 进行几次 AWS S3 API 调用。
  5. 如果步骤 3 和 4 通过,则在 PostgreSQL 表 A 中插入一行。
  6. 如果步骤 3 和 4 失败,则在 PostgreSQL 表 B 中插入一行。

我已经成功地使用

C#
.NET 8
Dapper
(对于
PostgreSQL 16
)按顺序逐行实现了上述过程,并作为
BackgroundService
实现。

但是,为了减少执行时间,是否建议使用

System.Threading.Channels
读取一行就立即处理? 每 100 / 1000 行,执行批量插入/复制语句到数据库中。

请问有更好的建议或方法吗?

编辑:AWS 调用是异步的,PostgreSQL 调用是同步的。一行一行的执行了52行,大约花费了5分钟。 AWS S3 API 调用没有限制。

c# .net asynchronous concurrency system.threading.channels
1个回答
0
投票

您不需要生产者/消费者模式实现,这是通过

Channel
s 完成的。

您可以简单地在单独的异步方法中定义单行的整个处理,并为每一行执行此方法。这里的关键是不要等待任务,只需开始工作并读取另一行,启动任务,读取行等等

为此,您需要有一些东西来跟踪正在进行的工作(开始

Task
),例如
ConcurrentBag<Task>

最后但并非最不重要的一点是,我添加了

SemaphoreSlim
,以便不允许(可能)生成数十万个
Task
,这可能会导致 ThreadPool Starvation

类似这样的:

// Specify appropriate value to limit concurrent processing.
private const byte MaxConcurrentExecutions = 10;
private readonly SemaphoreSlim _semaphore = new (MaxConcurrentExecutions);
private readonly ConcurrentBag<Task> _tasks = new();

public async Task HeavyProcessingOfTheFileAsync()
{
    while(!reader.EndOfStream)
    {
        // Wait for semaphore to let in.
        try
        {
            await _semaphore.WaitAsync();
            var line = await reader.ReadLineAsync();
            _tasks.Add(ProcessLineAsync(line));
        }
        finally
        {
            _semaphore.Release();
        }
    }
}

// Consider implementing IAsyncDisposable to easily await Tasks and dispose of them appropriately.
public async ValueTask DisposeAsync()
{
    await Task.WhenAll(_tasks);
}

private async Task ProcessLineAsync(string line)
{
    // ...generate DTO, save it in DB, whatever
}
© www.soinside.com 2019 - 2024. All rights reserved.