按需浏览RabbitMQ队列并处理之后的数据

问题描述 投票:0回答:1

尝试读取队列。队列中有 4 条消息,每条消息都非常小——就像“hello world”一样小!奇怪的是,有时它很慢,有时它很快,我不确定为什么。我猜这不是浏览队列的好方法,但出于我的目的,我无法以正常方式使用队列 - 我需要按需获取队列的内容,然后再对其进行处理。谁能帮帮我?

public async Task<new List<string>()>> GetMessages()
{
    var factory = new ConnectionFactory { HostName = "localhost"};
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    var queueDeclareResponse = channel.QueueDeclare("myQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

    var receivedMessages = new List<string>();
    var tcs = new TaskCompletionSource<List<string>>();

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body.ToArray());

        receivedMessages.Add(message);

        if (receivedMessages.Count == queueDeclareResponse.MessageCount)
        {
            tcs.SetResult(receivedMessages);
        }
    };

    channel.BasicConsume(queueName, autoAck: false, consumer: consumer);

    // Wait for the consumer to finish processing messages
    var result = await tcs.Task;

    return result; // RETURN TO CALLER TO DO STUFF WITH LIST OF MSGS...
}
c# rabbitmq
1个回答
0
投票

我会做这样的事情:

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;

namespace ConsoleApp1
{
    public class Program
    {
        public static async Task Main()
        {
            var factory = new ConnectionFactory { HostName = "localhost"};
            using var connection = factory.CreateConnection();

            var all = await GetAllMessagesAsync(connection, "myQueue", CancellationToken.None);
        }

        private static async Task<List<string>> GetAllMessagesAsync(IConnection connection, string queueName, CancellationToken ct)
        {
            await Task.Yield();
            using var model = connection.CreateModel();
            model.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
            model.BasicQos(0, 1, false);
            var result = new List<string>();
            while (true)
            {
                try
                {
                    var tmp = await GetNextMessageAsync(model, queueName, ct, 
                        TimeSpan.FromSeconds(1),
                        TimeSpan.FromMilliseconds(100));
                    if (tmp == null)
                        break;

                    var body = Encoding.UTF8.GetString(tmp.Body.Span);
                    result.Add(body);
                }
                catch (OperationCanceledException) when (ct.IsCancellationRequested)
                {
                    throw;
                }
            }
            return result;
        }

        private static async Task<BasicGetResult> GetNextMessageAsync(IModel model, string queueName, CancellationToken ct, TimeSpan timeout, TimeSpan checkInterval)
        {
            ct.ThrowIfCancellationRequested();
            
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
            cts.CancelAfter(timeout);
            while (true)
            {
                var tmp = model.BasicGet(queueName, false);
                if (tmp != null)
                    return tmp;

                try
                {
                    await Task.Delay(checkInterval, cts.Token).ConfigureAwait(false);
                }
                catch (OperationCanceledException)
                {
                    return model.BasicGet(queueName, false);
                }
            }
        }
    }
}

队列通常被设计为不是查看消息的“数据库”,RabbitMQ - 就是其中一种情况,它不是为拉取消息而设计的,而且 basic.get 效率不高(它不会让你批量获取消息),但对于 UI/小队列来说应该没问题。

附言

根据我在各种队列中的经验,RabbitMQ是其中最差的

  • 框架和服务器配置过于复杂,吃资源多,难以管理
  • 偶尔脑裂(这实际上会导致所有/部分消息丢失)很难进行手动恢复。
  • 内存溢出也很常见,需要经常监控
  • 客户是有状态的,有时他们几乎可以挂在任何地方。服务重启不可恢复。需要通过健康检查进行持续监控。
  • 您可以忘记正常关机,客户端在后台挂起。更糟糕的是 - 在终结器部分。
  • 有很多消费者/生产者的队列导致连接流失。

如果可以就使用 Kafka

  • 很简单(实际上只是一堆日志文件和带有偏移量的消费者)
  • 提供相同的性能(超过 10-100k rps 在三个节点上很容易)
  • 能做RabbitMQ能做的,broker该做的。
  • 让您以任何方式(通过水印)查看队列,许多云提供商默认将其作为其资产之一。
  • 有第 3 方用户界面来查看/管理主题/分区/消息

这里的比较非常明显:https://www.projectpro.io/article/kafka-vs-rabbitmq/451

© www.soinside.com 2019 - 2024. All rights reserved.