我正在使用 ASP.NET Web API 和 Angular 开发一个应用程序,具有两个主要功能:
文件上传: 用户可以上传文件,上传成功后,服务器会生成这些文件的URL。我需要将这些 URL 发布到 RabbitMQ 队列。
模板创建: 用户可以创建模板,这些模板也需要发布到另一个 RabbitMQ 队列。
问题: 我想先合并两个队列中的消息(文件 URL 和模板详细信息),然后再一起处理。最终目标是在创建模板时发送包含上传文件作为附件的电子邮件。
问题: 以可靠的方式合并来自两个 RabbitMQ 队列的消息的最佳方法是什么? 我应该使用专用的处理服务来消耗来自两个队列的消息,还是有更好的策略? 如何确保仅在成功合并两封邮件后才发送电子邮件? 任何建议或例子将不胜感激!
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
public class AlertMessageConsumerService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly RabbitMQSetting _rabbitMqSetting;
private readonly ILogger<AlertMessageConsumerService> _logger;
private IConnection _connection;
private IModel _channel;
public AlertMessageConsumerService(IOptions<RabbitMQSetting> rabbitMqSetting, IServiceProvider serviceProvider, ILogger<AlertMessageConsumerService> logger)
{
_rabbitMqSetting = rabbitMqSetting.Value;
_serviceProvider = serviceProvider;
_logger = logger;
var factory = new ConnectionFactory
{
HostName = _rabbitMqSetting.HostName,
UserName = _rabbitMqSetting.UserName,
Password = _rabbitMqSetting.Password
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
StartConsuming("alertQueue", stoppingToken);
StartConsuming("anotherQueue", stoppingToken);
await Task.CompletedTask;
}
private void StartConsuming(string queueName, CancellationToken cancellationToken)
{
_channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
bool processedSuccessfully = false;
try
{
processedSuccessfully = await ProcessMessageAsync(message);
}
catch (Exception ex)
{
_logger.LogError($"Exception occurred while processing message from queue {queueName}: {ex}");
}
if (processedSuccessfully)
{
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
else
{
_channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
}
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
private async Task<bool> ProcessMessageAsync(string message)
{
try
{
using (var scope = _serviceProvider.CreateScope())
{
var emailService = scope.ServiceProvider.GetRequiredService<IEmailSender>();
var vsoService = scope.ServiceProvider.GetRequiredService<IVsoService>();
var alertTypeService = scope.ServiceProvider.GetRequiredService<IAlertTypeService>();
var s3Service = scope.ServiceProvider.GetRequiredService<IFileService>();
var alertMessage = JsonConvert.DeserializeObject<AlertMessage>(message);
var uploadMessage = JsonConvert.DeserializeObject<FileUploadMessage>(message);
if (alertMessage != null && uploadMessage != null)
{
_logger.LogInformation($"AlertMessage: {JsonConvert.SerializeObject(alertMessage)}");
_logger.LogInformation($"FileUploadMessage: {JsonConvert.SerializeObject(uploadMessage)}");
return true;
}
else
{
_logger.LogWarning("Both AlertMessage and FileUploadMessage must be present in the message.");
return false;
}
}
}
catch (JsonException jsonEx)
{
_logger.LogError($"JSON error processing message: {jsonEx.Message}");
return false;
}
catch (Exception ex)
{
_logger.LogError($"Error processing message: {ex.Message}");
return false;
}
}
public override void Dispose()
{
_channel.Close();
_connection.Close();
base.Dispose();
}
}
如果您已经拥有后台处理的基础设施,那么针对此任务的专用流程听起来很合适。一般来说,一旦您进入协调异步后台进程的领域,企业集成模式就是一种宝贵的资源。不要被名字所迷惑;它实际上涉及各种异步处理模式,而不仅仅是企业集成。
在这种情况下,听起来您需要一个聚合器。生成与这两种消息关联的关联 ID,并让聚合器等待具有相同关联 ID 的两种消息。收到两条消息后,请发送电子邮件。
一些框架(最著名的是 NServiceBus)将此称为 Saga,因此您可能还想研究一下。这是他们的文档示例:https://docs.pspecial.net/tutorials/nservicebus-sagas/1-saga-basics/
(郑重声明:我不隶属于 NServiceBus。)