如何在.NET中并发处理大文本文件?

问题描述 投票:0回答:1

我有一个文本文件,其中有数十万行,每行都有固定的编号。由竖线 (|) 分隔的属性。 我已按如下方式处理每一行。

  1. 使用
    StreamReader.ReadLineAsync()
    从文件中读取一行。
  2. 拆分元素并将它们填充到 DTO 中。
  3. 使用 DTO 进行一些处理。
  4. 如果通过,则为每行/DTO 进行几次 AWS S3 API 调用。
  5. 如果步骤 3 和 4 通过,则在 PostgreSQL 表 A 中插入一行。
  6. 如果步骤 3 和 4 失败,则在 PostgreSQL 表 B 中插入一行。

我已经成功地使用

C#
.NET 8
Dapper
(对于
PostgreSQL 16
)按顺序逐行实现了上述过程,并作为
BackgroundService
实现。

但是,为了减少执行时间,是否建议使用

System.Threading.Channels
读取一行就立即处理? 每 100 / 1000 行,执行批量插入/复制语句到数据库中。

请问有更好的建议或方法吗?

c# .net asynchronous concurrency system.threading.channels
1个回答
0
投票

您不需要生产者/消费者模式实现,这是通过

Channel
s 完成的。

您可以简单地在单独的异步方法中定义单行的整个处理,并为每一行执行此方法。这里的关键是不要等待任务,只需开始工作并读取另一行,启动任务,读取行等等

为此,您需要有一些东西来跟踪正在进行的工作(开始

Task
),例如
ConcurrentBag<Task>

类似这样的:

private readonly ConcurrentBag<Task> _tasks = new();

public async Task HeavyProcessingOfTheFileAsync()
{
    while(!reader.EndOfStream)
    {
        // Create async task to read next line and
        // attach continuation to process the line.
        // Encapsulating Task will be stored in collection,
        // allowing us to abort/await the work.
        _tasks.Add(reader.ReadLineAsync().ContinueWith(t => ProcessLineAsync(t.Result)))
    }
}

// Consider implementing IAsyncDisposable to easily await Tasks and dispose of them appropriately.
public async ValueTask DisposeAsync()
{
    await Task.WhanAll(_tasks);
}

private async Task ProcessLineAsync(string line)
{
    // ...generate DTO, save it in DB, whatever
}
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.