数据流线程安全数据库上下文

问题描述 投票:0回答:1

我的应用程序接收多个文件(50 个文件,每个文件有 1000 行)并在数据流块中处理它们。在其中一个块

UploadToDatabaseBlock
中,我将
MaxDegreeOfParalellism
设置为 8。这将创建多个线程,这些线程使用 EntityFramework 将文件添加到数据库。数据库上下文设置为范围。

由于其范围有限,我得到了例外

A second operation started on this context before a previous operation completed
。因为这些服务使用的存储库对一个 API 请求使用相同的数据库上下文。

这个问题似乎很普遍:如何将数据流块(TPL 多线程)与 EntityFramework 及其 DbContext 一起使用。

Dataflow主方法代码:
ApplicationFileService.cs

public void UploadApplicationFiles(List<AplicationFileDTO> inputFiles)
{
    var invalidFiles = new ConcurrentQueue<ApplicationFile>();

    var sendFileToBlobBlock = new TransformBlock<ApplicationFileDTO, ApplicationFile>(async appFile =>
    {
        await azureBlobService.SendBlobAsync(appFile.FileContent);
        return ConstructDatabaseFile();
    });
    var saveFileToDatabaseBlock = new ActionBlock<ApplicationFile>(async appFileDb =>
    {
        try {
            SaveFileToDb(appFileDb); // unitOfWork.FilesRepository.AddFileAsync()
            await unitOfWork.FileRowsRepository.SaveRowsToDbAsync(appFileDb.Rows);
            await mediator.Send(new MarkFileCompleteRequest(appFileDb.Id));
            IsFileComplete(appFileDb);
            await unitOfWork.CommitChangesAsync();
        }
        catch (Exception) {
            // unitOfWork rollback
            // azureBlobService.DeleteAsync(appFileDb.BlobPath);
            invalidFiles.Add(appFileDb);
        }
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = SystemInfo.CpuCores });
    // Link blocks
    // Post each file from input list to first block 'sendFileToBlobBlock'
}

我找到的解决方案:

  1. 在 ASP .NET 依赖注入中将 DbContext 设置为 Transient。

  2. 为每个线程实例化一个 DbContext。
    PRO:我在网上看到这是首选方式。
    缺点: 我在上传过程中使用了很多服务。我调用一个私有方法,该方法调用另一个方法,该方法调用不同的服务类等。
    我设法通过创建一个静态类 AmbientScope 并在每个方法中提供线程范围的服务来使其工作。

public void IsFileComplete() {
    var serviceA = AmbientServiceScope.Current?.GetRequiredService<IServiceA>();
    var serviceB = AmbientServiceScope.Current?.GetRequiredService<IServiceB>();
    var serviceC = AmbientServiceScope.Current?.GetRequiredService<IServiceC>();
    var mediator = AmbientServiceScope.Current?.GetRequiredService<IMediator>();
    if (serviceA is null || serviceB is null || ...)
    {
        throw new InvalidOperationException("No ambient scope available!");
    }
    // continue method logic
    // public method because its used by other services/requests to check a file is complete, 
    // not only by UploadWithDataflow
}

问题:

  1. 这是很多样板代码。
  2. 如果多个 API 请求(上传文件、检查文件完成)使用同一个方法,则不使用线程的请求仍然需要创建 Ambient Scope 才能工作。
    我再次需要样板代码并在各处注入 IServiceScopeFactory:
using var scope = serviceScopeFactory.CreateScope();
using (AmbientServiceScope.SetScope(scope.ServiceProvider))
{
...
}

这是一个很常见的问题。一定有一个简单的解决方案。

环境范围

public static class AmbientServiceScope
{
    private static readonly AsyncLocal<IServiceProvider?> _currentScope = new();

    public static IServiceProvider? Current => _currentScope.Value;

    public static IDisposable SetScope(IServiceProvider serviceProvider)
    {
        var previous = _currentScope.Value;
        _currentScope.Value = serviceProvider;

        return new DisposeAction(() => _currentScope.Value = previous);
    }

    private class DisposeAction(Action disposeAction) : IDisposable
    {
        public void Dispose() => disposeAction();
    }
}
c# asp.net entity-framework-core dbcontext tpl-dataflow
1个回答
0
投票

每个并行线程都需要自己的 DbContext,因为单个作用域上下文不是线程安全的。当线程工作线程需要调用通常访问单个注入的模块级 DbContext 实例的各种方法时,这可能会很麻烦,因为您需要将范围严格的每线程 DbContext 实例传递给每个此类方法。

我不久前采用的工作单元模式实现可以对此提供帮助,因为它的工作方式与您的环境服务范围类似。 DbContextScope。 (https://www.nuget.org/packages/Zejji.DbContextScope.EFCore

DbContextScope 不是通过容器注入 DbContext,而是分为两部分。 DbContextScopeFactory(创建范围/UoW)和 AmbientDbContextLocator,用于从范围内检索 DbContext 实例。 通常,这对于拥有工作单元/存储库模式很有用,但它对于多线程作用域也非常有效。

这两个依赖项均已在 IoC 容器中注册。每个的范围寿命都很好。当 DbContextScope 需要创建实例时,如果 DbContext 初始化本身需要特定设置,您还可以扩展 DbContextFactory 接口。

给定一个类,该类具有一个线程工作方法,该方法需要为该线程/操作确定 DbContext 的范围,以及几个想要访问该范围 DbContext 的工作方法:

public class ThreadWorker(IDbContextScopeFactory _contextScopeFactory, IAmbientDbContextLocator _contextLocator)
{
    private AppDbContext Context => _contextLocator.Get<AppDbContext>();

    public async Task DoWork()
    {
        using var contextScope = _contextScopeFactory.Create();
        await doSomething();
        await doSomethingElse();
        await contextScope.SaveChangesAsync();
    }
    
    private async Task doSomething()
    {
        var foo = await Context.Foos.FirstOrDefaultAsync();
        // ...
    }

    private async Task doSomethingElse()
    {
        var bar = await Context.Bars.FirstOrDefaultAsync();
        // ...
    }
}

在这里,您可以通过定位器访问 DbContext,该定位器为您提供范围为任何现有 DbContextScope 的 DbContext。如果您在没有先创建范围的情况下尝试访问“Context”,则会出现异常。这样,任何工作方法或存储库类等都可以访问类似于注入的 DbContext 的单个作用域 DbContext,而工作线程方法控制作用域,确保每个线程都有一个隔离的作用域和生成的 DbContext。

© www.soinside.com 2019 - 2024. All rights reserved.