我围绕 RabbitMQ 编写了一个快速包装器,但很难发布超过 1 条消息?
如果我有一批 1,000 条消息需要随机添加,我使用 PublishBatch,它只是循环 BasicPublish。
出于某种原因,我最后添加的项目有时只有一半,有时是 90%?
这里发生了什么?如果我添加 100 个项目,我最终得到 21 个?然后35?然后13?
测试:
var messages = new List<string>();
for (var i = 0; i < 100; i++)
{
messages.Add(RandomString(200));
}
Console.WriteLine($"Sending {messages.Count} messages to the queue...");
var sw = Stopwatch.StartNew();
services.GetRequiredService<MessageQueueWrapper>().PublishBatch("some-test-queue", messages);
RabbitMQ 客户端包装器:
public class MessageQueueWrapper : IDisposable
{
private readonly IConnection _connection;
private readonly IModel _channel;
public MessageQueueWrapper(IConnectionFactory connectionFactory)
{
_connection = connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
}
public void QueueDeclare(string queueName, bool durable = true)
{
_channel.QueueDeclare(queue: queueName, durable: durable, exclusive: false, autoDelete: false, arguments: null);
}
public void BasicPublish(string queueName, string message, string exchange = "")
{
var bytes = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchange, queueName, null, bytes);
}
public void PublishBatch(string queueName, List<string> messages, string exchange = "")
{
QueueDeclare(queueName);
foreach (var bytes in messages.Select(message => Encoding.UTF8.GetBytes(message)))
{
_channel.BasicPublish(exchange, queueName, null, bytes);
}
}
public void Dispose()
{
_connection.Dispose();
_channel.Dispose();
}
}