我的应用程序接收多个文件(50 个文件,每个文件有 1000 行)并在数据流块中处理它们。在其中一个块
UploadToDatabaseBlock
中,我将 MaxDegreeOfParalellism
设置为 8。这将创建多个线程,这些线程使用 EntityFramework 将文件添加到数据库。数据库上下文设置为范围。
由于其范围有限,我得到了例外
A second operation started on this context before a previous operation completed
。因为这些服务使用的存储库对一个 API 请求使用相同的数据库上下文。
这个问题似乎很普遍:如何将数据流块(TPL 多线程)与 EntityFramework 及其 DbContext 一起使用。
Dataflow主方法代码:
ApplicationFileService.cs
public void UploadApplicationFiles(List<AplicationFileDTO> inputFiles)
{
var invalidFiles = new ConcurrentQueue<ApplicationFile>();
var sendFileToBlobBlock = new TransformBlock<ApplicationFileDTO, ApplicationFile>(async appFile =>
{
await azureBlobService.SendBlobAsync(appFile.FileContent);
return ConstructDatabaseFile();
});
var saveFileToDatabaseBlock = new ActionBlock<ApplicationFile>(async appFileDb =>
{
try {
SaveFileToDb(appFileDb); // unitOfWork.FilesRepository.AddFileAsync()
await unitOfWork.FileRowsRepository.SaveRowsToDbAsync(appFileDb.Rows);
await mediator.Send(new MarkFileCompleteRequest(appFileDb.Id));
IsFileComplete(appFileDb);
await unitOfWork.CommitChangesAsync();
}
catch (Exception) {
// unitOfWork rollback
// azureBlobService.DeleteAsync(appFileDb.BlobPath);
invalidFiles.Add(appFileDb);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = SystemInfo.CpuCores });
// Link blocks
// Post each file from input list to first block 'sendFileToBlobBlock'
}
我找到的解决方案:
在 ASP .NET 依赖注入中将 DbContext 设置为 Transient。
为每个线程实例化一个 DbContext。
PRO:我在网上看到这是首选方式。
缺点:
我在上传过程中使用了很多服务。我调用一个私有方法,该方法调用另一个方法,该方法调用不同的服务类等。
我设法通过创建一个静态类 AmbientScope 并在每个方法中提供线程范围的服务来使其工作。
public void IsFileComplete() {
var serviceA = AmbientServiceScope.Current?.GetRequiredService<IServiceA>();
var serviceB = AmbientServiceScope.Current?.GetRequiredService<IServiceB>();
var serviceC = AmbientServiceScope.Current?.GetRequiredService<IServiceC>();
var mediator = AmbientServiceScope.Current?.GetRequiredService<IMediator>();
if (serviceA is null || serviceB is null || ...)
{
throw new InvalidOperationException("No ambient scope available!");
}
// continue method logic
// public method because its used by other services/requests to check a file is complete,
// not only by UploadWithDataflow
}
问题:
using var scope = serviceScopeFactory.CreateScope();
using (AmbientServiceScope.SetScope(scope.ServiceProvider))
{
...
}
这是一个很常见的问题。一定有一个简单的解决方案。
环境范围
public static class AmbientServiceScope
{
private static readonly AsyncLocal<IServiceProvider?> _currentScope = new();
public static IServiceProvider? Current => _currentScope.Value;
public static IDisposable SetScope(IServiceProvider serviceProvider)
{
var previous = _currentScope.Value;
_currentScope.Value = serviceProvider;
return new DisposeAction(() => _currentScope.Value = previous);
}
private class DisposeAction(Action disposeAction) : IDisposable
{
public void Dispose() => disposeAction();
}
}
每个并行线程都需要自己的 DbContext,因为单个作用域上下文不是线程安全的。当线程工作线程需要调用通常访问单个注入的模块级 DbContext 实例的各种方法时,这可能会很麻烦,因为您需要将范围严格的每线程 DbContext 实例传递给每个此类方法。
我不久前采用的工作单元模式实现可以对此提供帮助,因为它的工作方式与您的环境服务范围类似。 DbContextScope。 (https://www.nuget.org/packages/Zejji.DbContextScope.EFCore)
DbContextScope 不是通过容器注入 DbContext,而是分为两部分。 DbContextScopeFactory(创建范围/UoW)和 AmbientDbContextLocator,用于从范围内检索 DbContext 实例。 通常,这对于拥有工作单元/存储库模式很有用,但它对于多线程作用域也非常有效。
这两个依赖项均已在 IoC 容器中注册。每个的范围寿命都很好。当 DbContextScope 需要创建实例时,如果 DbContext 初始化本身需要特定设置,您还可以扩展 DbContextFactory 接口。
给定一个类,该类具有一个线程工作方法,该方法需要为该线程/操作确定 DbContext 的范围,以及几个想要访问该范围 DbContext 的工作方法:
public class ThreadWorker(IDbContextScopeFactory _contextScopeFactory, IAmbientDbContextLocator _contextLocator)
{
private AppDbContext Context => _contextLocator.Get<AppDbContext>();
public async Task DoWork()
{
using var contextScope = _contextScopeFactory.Create();
await doSomething();
await doSomethingElse();
await contextScope.SaveChangesAsync();
}
private async Task doSomething()
{
var foo = await Context.Foos.FirstOrDefaultAsync();
// ...
}
private async Task doSomethingElse()
{
var bar = await Context.Bars.FirstOrDefaultAsync();
// ...
}
}
在这里,您可以通过定位器访问 DbContext,该定位器为您提供范围为任何现有 DbContextScope 的 DbContext。如果您在没有先创建范围的情况下尝试访问“Context”,则会出现异常。这样,任何工作方法或存储库类等都可以访问类似于注入的 DbContext 的单个作用域 DbContext,而工作线程方法控制作用域,确保每个线程都有一个隔离的作用域和生成的 DbContext。