我有一个文本文件,其中有数十万行,每行都有固定的编号。由竖线 (|) 分隔的属性。 我已按如下方式处理每一行。
StreamReader.ReadLineAsync()
从文件中读取一行。我已经成功地使用
C#
、.NET 8
和 Dapper
(对于 PostgreSQL 16
)按顺序逐行实现了上述过程,并作为 BackgroundService
实现。
但是,为了减少执行时间,是否建议使用
System.Threading.Channels
读取一行就立即处理?
每 100 / 1000 行,执行批量插入/复制语句到数据库中。
请问有更好的建议或方法吗?
编辑:AWS 调用是异步的,PostgreSQL 调用是同步的。一行一行的执行了52行,大约花费了5分钟。 AWS S3 API 调用没有限制。
您不需要生产者/消费者模式实现,这是通过
Channel
s 完成的。
您可以简单地在单独的异步方法中定义单行的整个处理,并为每一行执行此方法。这里的关键是不要等待任务,只需开始工作并读取另一行,启动任务,读取行等等
为此,您需要有一些东西来跟踪正在进行的工作(开始
Task
),例如ConcurrentBag<Task>
。
SemaphoreSlim
,以便不允许(可能)生成数十万个 Task
,这可能会导致 ThreadPool Starvation
类似这样的:
// Specify appropriate value to limit concurrent processing.
private const byte MaxConcurrentExecutions = 10;
private readonly SemaphoreSlim _semaphore = new (MaxConcurrentExecutions);
private readonly ConcurrentBag<Task> _tasks = new();
public async Task HeavyProcessingOfTheFileAsync()
{
while(!reader.EndOfStream)
{
// Wait for semaphore to let in.
try
{
await _semaphore.WaitAsync();
var line = await reader.ReadLineAsync();
_tasks.Add(ProcessLineAsync(line));
}
finally
{
_semaphore.Release();
}
}
}
// Consider implementing IAsyncDisposable to easily await Tasks and dispose of them appropriately.
public async ValueTask DisposeAsync()
{
await Task.WhenAll(_tasks);
}
private async Task ProcessLineAsync(string line)
{
// ...generate DTO, save it in DB, whatever
}