在我的网络应用程序中,我对长时间运行的任务执行操作,我想在后台调用此任务。因此,根据文档 .NET Core 3.1 排队后台任务我为此使用这样的代码:
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;
public BackgroundTaskQueue(int capacity)
{
var options = new BoundedChannelOptions(capacity){FullMode = BoundedChannelFullMode.Wait};
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem)
{
if (workItem == null)throw new ArgumentNullException(nameof(workItem));
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken)
{
var workItem = await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
}
和托管服务
public class QueuedHostedService : BackgroundService
{
private readonly ILogger<QueuedHostedService> _logger;
public QueuedHostedService(IBackgroundTaskQueue taskQueue, ILogger<QueuedHostedService> logger)
{
TaskQueue = taskQueue;
_logger = logger;
}
public IBackgroundTaskQueue TaskQueue { get; }
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await BackgroundProcessing(stoppingToken);
}
private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem = await TaskQueue.DequeueAsync(stoppingToken);
try
{
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Queued Hosted Service is stopping.");
await base.StopAsync(stoppingToken);
}
}
然后我注册所有服务
services.AddHostedService<QueuedHostedService>();
services.AddSingleton<IBackgroundTaskQueue>(new BackgroundTaskQueue(queueCapacity));
然后我可以通过不带参数的调用来成功使用它,就像在这样的示例中
public async Task<TenantBo> RegisterCompanyAsync(AddTenantBo addTenantBo)
{
var tenantBo = new TenantBo();
try
{
_companyRegistrationLogHelper.SetInfoLog(GetTenantId(tenantBo),
"Start create company: " + JsonConvert.SerializeObject(addTenantBo));
InitOnCreateCompanyTasks(tenantBo);
//skip if already create tenant
tenantBo = await CreateTenantAsync(tenantBo, addTenantBo);
//run in background
_companyRegistationQueue.QueueBackgroundWorkItemAsync(RunRegistrationCompanyMainAsync);
return tenantBo;
}
catch (Exception e)
{
//some logs
return tenantBo;
}
}
private async ValueTask RunRegistrationCompanyMainAsync(CancellationToken cancellationToken)
{
//some await Tasks
}
private async ValueTask RunRegistrationCompanyMainAsync(string tenantId, CancellationToken cancellationToken)
{
//some await Tasks
}
所以我只能使用一个参数调用 RunRegistrationCompanyMainAsync(CancellationToken CancellationToken) ,而不能使用两个参数调用 RunRegistrationCompanyMainAsync(stringtenantId, CancellationToken CancellationToken)
你可以帮我传递字符串参数作为此任务的参数吗?
在
QueueBackgroundWorkItemAsync(RunRegistrationCompanyMainAsync)
调用中,编译器实际上执行 从方法组到委托的转换。但是要提供 Func
委托实例,您不限于方法组,您可以提供 lambda 表达式,例如:
var someTenantId = ....
.....
_companyRegistationQueue.QueueBackgroundWorkItemAsync(ct => RunRegistrationCompanyMainAsync(someTenantId, ct));
过了一段时间我找到了解决方案。 只需要像这样使用元组
public class CompanyRegistationQueue : ICompanyRegistationQueue
{
private readonly Channel<Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>>> _queue;
public CompanyRegistationQueue(int capacity)
{
var options = new BoundedChannelOptions(capacity) { FullMode = BoundedChannelFullMode.Wait };
_queue = Channel.CreateBounded<Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>>**>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>> workItem)
{
if (workItem == null) throw new ArgumentNullException(nameof(workItem));
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>>> DequeueAsync(CancellationToken cancellationToken)
{
var workItem = await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
}
然后这样称呼它
private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem = await TaskQueue.DequeueAsync(stoppingToken);
try
{
//item2 is task
await workItem.Item2(workItem.Item1, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
在代码中调用
var paramValue = new Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>>(createCompanyModel, RunRegistrationCompanyMainAsync);
await _companyRegistationQueue.QueueBackgroundWorkItemAsync(paramValue);
附注元组可能不是最好的解决方案,但它的工作
我倾向于回避使用 Tuple,因为
foo.Item1
语法并不理想。因此,我通过创建如下记录解决了这个问题:
public record QueueItem(Foo MyFoo, Func<Foo, CancellationToken, ValueTask> WorkItem);
然后我可以调整队列类中
Channel
对象的签名:
public class MyQueue : IMyQueue
{
private readonly Channel<QueueItem> _queue
= Channel.CreateUnbounded<QueueItem>();
public async ValueTask<QueueItem> DequeueAsync(CancellationToken cancellationToken)
=> await _queue.Reader.ReadAsync(cancellationToken);
public async ValueTask QueueAsync(QueueItem queueItem)
=> await _queue.Writer.WriteAsync(queueItem);
}
然后,在后台服务中,我可以像这样(简化)调用
Func
:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Get the first job in the queue and process it
var job = await _queue.DequeueAsync(stoppingToken);
if (job is not null)
{
await job.WorkItem(job.Foo, stoppingToken);
}
// Always delay further processing for 5 seconds
await Task.Delay(5000, stoppingToken);
}
}