使用 RabbitMQ 合并 ASP.NET Web API 中的文件上传和模板创建消息

问题描述 投票:0回答:1

我正在使用 ASP.NET Web API 和 Angular 开发一个应用程序,具有两个主要功能:

  1. 文件上传: 用户可以上传文件,上传成功后,服务器会生成这些文件的URL。我需要将这些 URL 发布到 RabbitMQ 队列。

  2. 模板创建: 用户可以创建模板,这些模板也需要发布到另一个 RabbitMQ 队列。

问题: 我想先合并两个队列中的消息(文件 URL 和模板详细信息),然后再一起处理。最终目标是在创建模板时发送包含上传文件作为附件的电子邮件。

问题: 以可靠的方式合并来自两个 RabbitMQ 队列的消息的最佳方法是什么? 我应该使用专用的处理服务来消耗来自两个队列的消息,还是有更好的策略? 如何确保仅在成功合并两封邮件后才发送电子邮件? 任何建议或例子将不胜感激!

using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class AlertMessageConsumerService : BackgroundService
{
    private readonly IServiceProvider _serviceProvider;
    private readonly RabbitMQSetting _rabbitMqSetting;
    private readonly ILogger<AlertMessageConsumerService> _logger;

    private IConnection _connection;
    private IModel _channel;

    public AlertMessageConsumerService(IOptions<RabbitMQSetting> rabbitMqSetting, IServiceProvider serviceProvider, ILogger<AlertMessageConsumerService> logger)
    {
        _rabbitMqSetting = rabbitMqSetting.Value;
        _serviceProvider = serviceProvider;
        _logger = logger;

        var factory = new ConnectionFactory
        {
            HostName = _rabbitMqSetting.HostName,
            UserName = _rabbitMqSetting.UserName,
            Password = _rabbitMqSetting.Password
        };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        StartConsuming("alertQueue", stoppingToken);
        StartConsuming("anotherQueue", stoppingToken);

        await Task.CompletedTask;
    }

    private void StartConsuming(string queueName, CancellationToken cancellationToken)
    {
        _channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += async (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);

            bool processedSuccessfully = false;
            try
            {
                processedSuccessfully = await ProcessMessageAsync(message);
            }
            catch (Exception ex)
            {
                _logger.LogError($"Exception occurred while processing message from queue {queueName}: {ex}");
            }

            if (processedSuccessfully)
            {
                _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            }
            else
            {
                _channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
            }
        };

        _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    }

    private async Task<bool> ProcessMessageAsync(string message)
    {
        try
        {
            using (var scope = _serviceProvider.CreateScope())
            {
                var emailService = scope.ServiceProvider.GetRequiredService<IEmailSender>();
                var vsoService = scope.ServiceProvider.GetRequiredService<IVsoService>();
                var alertTypeService = scope.ServiceProvider.GetRequiredService<IAlertTypeService>();
                var s3Service = scope.ServiceProvider.GetRequiredService<IFileService>();

                var alertMessage = JsonConvert.DeserializeObject<AlertMessage>(message);
                var uploadMessage = JsonConvert.DeserializeObject<FileUploadMessage>(message);

                if (alertMessage != null && uploadMessage != null)
                {
                    _logger.LogInformation($"AlertMessage: {JsonConvert.SerializeObject(alertMessage)}");
                    _logger.LogInformation($"FileUploadMessage: {JsonConvert.SerializeObject(uploadMessage)}");

                    return true;
                }
                else
                {
                    _logger.LogWarning("Both AlertMessage and FileUploadMessage must be present in the message.");
                    return false;
                }
            }
        }
        catch (JsonException jsonEx)
        {
            _logger.LogError($"JSON error processing message: {jsonEx.Message}");
            return false;
        }
        catch (Exception ex)
        {
            _logger.LogError($"Error processing message: {ex.Message}");
            return false;
        }
    }

    public override void Dispose()
    {
        _channel.Close();
        _connection.Close();
        base.Dispose();
    }
}
c# asp.net-mvc asp.net-web-api rabbitmq spring-rabbit
1个回答
0
投票

如果您已经拥有后台处理的基础设施,那么针对此任务的专用流程听起来很合适。一般来说,一旦您进入协调异步后台进程的领域,企业集成模式就是一种宝贵的资源。不要被名字所迷惑;它实际上涉及各种异步处理模式,而不仅仅是企业集成。

在这种情况下,听起来您需要一个聚合器。生成与这两种消息关联的关联 ID,并让聚合器等待具有相同关联 ID 的两种消息。收到两条消息后,请发送电子邮件。

一些框架(最著名的是 NServiceBus)将此称为 Saga,因此您可能还想研究一下。这是他们的文档示例:https://docs.pspecial.net/tutorials/nservicebus-sagas/1-saga-basics/

(郑重声明:我不隶属于 NServiceBus。)

© www.soinside.com 2019 - 2024. All rights reserved.