这是我的代码的外观:
在startup.cs中,我正在添加托管服务和后台队列
services.AddHostedService<QueuedHostedService>();
services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
然后,我实现如下范围服务,托管服务和后台队列:
namespace Services.Services {
public class QueuedHostedService: BackgroundService {
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
public QueuedHostedService(IServiceProvider serviceProvider, IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) {
_serviceProvider = serviceProvider;
TaskQueue = taskQueue;
_logger = loggerFactory.CreateLogger < QueuedHostedService > ();
}
public IBackgroundTaskQueue TaskQueue {
get;
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
while (!cancellationToken.IsCancellationRequested) {
var workItem = await TaskQueue.DequeueAsync(cancellationToken);
try {
await workItem(cancellationToken);
} catch (Exception ex) {
}
}
}
}
}
public interface IBackgroundTaskQueue {
void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem);
Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken);
}
namespace Services.Services {
public class BackgroundTaskQueue: IBackgroundTaskQueue {
private ConcurrentQueue < Func < CancellationToken, Task >> _workItems = new ConcurrentQueue < Func < CancellationToken, Task >> ();
private SemaphoreSlim _signal = new SemaphoreSlim(0);
public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) {
if (workItem == null) {
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue(workItem);
_signal.Release();
}
public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) {
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out
var workItem);
return workItem;
}
}
}
// scoped service
namespace Services.Services {
public class ImportService: BaseService, IImportService {
private readonly IFileProcessingService _scopedProcessingService;
private readonly ConfigurationSettings _configurationSettings;
public IBackgroundTaskQueue Queue {
get;
}
private
const string AZURE_BLOB_CONTAINER = "blobcontainer";
public IServiceProvider Services {
get;
}
public ImportService(IServiceProvider services, IBackgroundTaskQueue queue): base(services) {
Services = services;
_configurationSettings = services.GetService < ConfigurationSettings > ();
_scopedProcessingService = services.GetProcessingService();
Queue = queue;
}
// ---- Main file
public async Task ImportFile(string filePath, long fileSize, int userId, FileFormatType fileFormat, TransactionsDataHeadersMap dataHeadersMap, string delimiter, string dateFormat) {
await _scopedProcessingService.ImportFile(filePath, fileSize, userId, fileFormat, dataHeadersMap, delimiter, dateFormat);
}
public async Task UploadToBlobStorage(IFormFile file, int userId, TransactionalDataFileType type) {
var fileFormat = GetFileFormat(file);
var tempFilePath = await GetTemporaryPath(file);
var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat;
// ....... //
ProcessFile(tempFilePath, fileFormat, file, type, userId);
}
private void ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Tyoe type, int userId) {
var delimiter = ",";
Queue.QueueBackgroundWorkItem(async token => {
using(var scope = Services.CreateScope()) {
var scopedProcessingService =
scope.ServiceProvider
.GetRequiredService < IFileProcessingService > ();
// do the processing
switch (type) {
case "csv":
await scopedProcessingService.ImportFile(tempFilePath, file.Length, userId, fileFormat, new Headers(), delimiter ? ? ",", "yyyy-MM-dd");
break;
}
}
});
}
}
}
我正在根据控制器中的请求添加元素。现在,我想添加另一个队列来处理其他请求。是否可以使用同一托管服务使用另一个队列?我很难找到如何做到这一点的例子。我应该只添加另一个作用域的servide和另一个后台队列吗?
第一个选项是最简单的-您只需创建一堆类和接口QueuedHostedServiceA
,QueuedHostedServiceB
,IBackgroundTaskQueueA
..(可以使用继承来减少代码重复)
此外,您还可以引入“处理程序”的概念并使所有这些东西通用:
interface IHandler<T> { Task Handle(T msg, CancelationToken ...)}
interface IBackgroundMessageQueue<T> {...} // same impl but with T instead of Func<CancellationToken,Task>
class IBackgroundMessageQueue<T> {...} // same impl but with T instead of Func<CancellationToken,Task>
class QueuedHostedService<T>
{
public QueuedHostedService(..., IBackgroundMessageQueue<T> queue, IHandler<T> h) {... }
protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
while (!cancellationToken.IsCancellationRequested) {
T message = await queue.DequeueAsync(cancellationToken);
try {
using(var scp = serviceProvider.CreateScope())
{
var handler = ServiceProvider.GetRequiredService<IHandler<T>>;
await handler.Handle(message, cancellationToken);
}
} catch (Exception ex) {
}
}
}
}
并且为每种消息类型创建自己的处理程序:
class ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Tyoe type, int userId){}
FileProcessor: IHandler<ProcessFile> {implement your logic from ImportService.ProcessFile}
然后您注册所有内容:
services.AddScoped<IHandler<ProcessFile>, FileProcessor>()
services.AddSingleton<IBackgroundTaskQueue<ProcessFile>, BackgroundTaskQueue<ProcessFile>>();
services.AddHostedService<QueuedHostedService<ProcessFile>>();
并且在您的ImportService
中,您可以解决输入队列:
public ImportService(IBackgroundMessageQueue<ProcessFile> queue)
并在需要时将消息放入其中。