我有一个文本文件,其中有数十万行,每行都有固定的编号。由竖线 (|) 分隔的属性。 我已按如下方式处理每一行。
StreamReader.ReadLineAsync()
从文件中读取一行。我已经成功地使用
C#
、.NET 8
和 Dapper
(对于 PostgreSQL 16
)按顺序逐行实现了上述过程,并作为 BackgroundService
实现。
但是,为了减少执行时间,是否建议使用
System.Threading.Channels
读取一行就立即处理?
每 100 / 1000 行,执行批量插入/复制语句到数据库中。
请问有更好的建议或方法吗?
您不需要生产者/消费者模式实现,这是通过
Channel
s 完成的。
您可以简单地在单独的异步方法中定义单行的整个处理,并为每一行执行此方法。这里的关键是不要等待任务,只需开始工作并读取另一行,启动任务,读取行等等
为此,您需要有一些东西来跟踪正在进行的工作(开始
Task
),例如ConcurrentBag<Task>
。
类似这样的:
private readonly ConcurrentBag<Task> _tasks = new();
public async Task HeavyProcessingOfTheFileAsync()
{
while(!reader.EndOfStream)
{
// Create async task to read next line and
// attach continuation to process the line.
// Encapsulating Task will be stored in collection,
// allowing us to abort/await the work.
_tasks.Add(reader.ReadLineAsync().ContinueWith(t => ProcessLineAsync(t.Result)))
}
}
// Consider implementing IAsyncDisposable to easily await Tasks and dispose of them appropriately.
public async ValueTask DisposeAsync()
{
await Task.WhanAll(_tasks);
}
private async Task ProcessLineAsync(string line)
{
// ...generate DTO, save it in DB, whatever
}