尝试读取队列。队列中有 4 条消息,每条消息都非常小——就像“hello world”一样小!奇怪的是,有时它很慢,有时它很快,我不确定为什么。我猜这不是浏览队列的好方法,但出于我的目的,我无法以正常方式使用队列 - 我需要按需获取队列的内容,然后再对其进行处理。谁能帮帮我?
public async Task<new List<string>()>> GetMessages()
{
var factory = new ConnectionFactory { HostName = "localhost"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var queueDeclareResponse = channel.QueueDeclare("myQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
var receivedMessages = new List<string>();
var tcs = new TaskCompletionSource<List<string>>();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
receivedMessages.Add(message);
if (receivedMessages.Count == queueDeclareResponse.MessageCount)
{
tcs.SetResult(receivedMessages);
}
};
channel.BasicConsume(queueName, autoAck: false, consumer: consumer);
// Wait for the consumer to finish processing messages
var result = await tcs.Task;
return result; // RETURN TO CALLER TO DO STUFF WITH LIST OF MSGS...
}
我会做这样的事情:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
namespace ConsoleApp1
{
public class Program
{
public static async Task Main()
{
var factory = new ConnectionFactory { HostName = "localhost"};
using var connection = factory.CreateConnection();
var all = await GetAllMessagesAsync(connection, "myQueue", CancellationToken.None);
}
private static async Task<List<string>> GetAllMessagesAsync(IConnection connection, string queueName, CancellationToken ct)
{
await Task.Yield();
using var model = connection.CreateModel();
model.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
model.BasicQos(0, 1, false);
var result = new List<string>();
while (true)
{
try
{
var tmp = await GetNextMessageAsync(model, queueName, ct,
TimeSpan.FromSeconds(1),
TimeSpan.FromMilliseconds(100));
if (tmp == null)
break;
var body = Encoding.UTF8.GetString(tmp.Body.Span);
result.Add(body);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
throw;
}
}
return result;
}
private static async Task<BasicGetResult> GetNextMessageAsync(IModel model, string queueName, CancellationToken ct, TimeSpan timeout, TimeSpan checkInterval)
{
ct.ThrowIfCancellationRequested();
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(timeout);
while (true)
{
var tmp = model.BasicGet(queueName, false);
if (tmp != null)
return tmp;
try
{
await Task.Delay(checkInterval, cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return model.BasicGet(queueName, false);
}
}
}
}
}
队列通常被设计为不是查看消息的“数据库”,RabbitMQ - 就是其中一种情况,它不是为拉取消息而设计的,而且 basic.get 效率不高(它不会让你批量获取消息),但对于 UI/小队列来说应该没问题。
附言
根据我在各种队列中的经验,RabbitMQ是其中最差的:
如果可以就使用 Kafka:
这里的比较非常明显:https://www.projectpro.io/article/kafka-vs-rabbitmq/451