带有队列处理的.NET核心web api

问题描述 投票:0回答:1

如何设置.NET核心web api

  • 接受一个字符串值,
  • 放入队列
  • 并返回标记该消息被接受(无论它是否被处理)。

此外,还是一个不断检查队列并逐个处理消息的例程。

根据要求,api将充当消息的接收者,其可能在一分钟内达到数百次命中,而其接收的消息应逐个处理。我对web apis有点新意,所以想知道这样的设置是否合适,如果是的话如何组合不同的组件。

提前致谢..

asp.net-core queue asp.net-web-api2
1个回答
0
投票

老实说,我不认为在一个过程中接收和处理消息是有意义的,所以我建议使用外部消息系统,如RabbitMQKafka或您喜欢的任何其他现有系统,您可以在其中放置您的消息和另一个过程会消耗它。这是一个非常大的话题,你可以从this tutorial开始

如果您仍希望在一个进程中拥有它,那么您也可以创建一个后台任务队列,将消息放在那里并创建background task,它将从该队列中使用它们。

public interface IBackgroundTaskQueue
{
    void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);

    Task<Func<CancellationToken, Task>> DequeueAsync(
        CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private ConcurrentQueue<Func<CancellationToken, Task>> _workItems = 
        new ConcurrentQueue<Func<CancellationToken, Task>>();
    private SemaphoreSlim _signal = new SemaphoreSlim(0);

    public void QueueBackgroundWorkItem(
        Func<CancellationToken, Task> workItem)
    {
        if (workItem == null)
        {
            throw new ArgumentNullException(nameof(workItem));
        }

        _workItems.Enqueue(workItem);
        _signal.Release();
    }

    public async Task<Func<CancellationToken, Task>> DequeueAsync(
        CancellationToken cancellationToken)
    {
        await _signal.WaitAsync(cancellationToken);
        _workItems.TryDequeue(out var workItem);

        return workItem;
    }
}

后台任务:

public class QueuedHostedService : BackgroundService
{
    private readonly ILogger _logger;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, 
        ILoggerFactory loggerFactory)
    {
        TaskQueue = taskQueue;
        _logger = loggerFactory.CreateLogger<QueuedHostedService>();
    }

    public IBackgroundTaskQueue TaskQueue { get; }

    protected async override Task ExecuteAsync(
        CancellationToken cancellationToken)
    {
        _logger.LogInformation("Queued Hosted Service is starting.");

        while (!cancellationToken.IsCancellationRequested)
        {
            var workItem = await TaskQueue.DequeueAsync(cancellationToken);

            try
            {
                await workItem(cancellationToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, 
                   $"Error occurred executing {nameof(workItem)}.");
            }
        }

        _logger.LogInformation("Queued Hosted Service is stopping.");
    }
}

注册:

public void ConfigureServices(IServiceCollection services)
{
    services.AddHostedService<QueuedHostedService>();
    services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
}

注入控制器:

public class ApiController
{
    private IBackgroundTaskQueue queue;
    public ApiController(IBackgroundTaskQueue queue)
    {
        this.queue = queue;
    }

    public IActionResult StartProcessing()
    {
        queue.QueueBackgroundWorkItem(async token =>
        {
            // put processing code here
        }

        return Ok();
    }
}

您可以修改BackgroundTaskQueue以满足您的要求,但我希望您理解这背后的想法。

© www.soinside.com 2019 - 2024. All rights reserved.