如何让RabbitMQ从队列中一一读取?

问题描述 投票:0回答:2

我只是在玩 RabbitMQ 并尝试在两个 C# 项目中设置测试发送器和接收器。

TestSender.cs

using System;
using RabbitMQ.Client;
using System.Text;

public class TestSender
{
    public TestSender()
    {
    }

    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "Test Queue",
                                    durable: true,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: null);

                Console.WriteLine(" Press [S] to send a message, [Enter] to exit.");

                ConsoleKey key;
                int messageId = 0;
                while (true)
                {
                    key = Console.ReadKey(true).Key;

                    if (key == ConsoleKey.Enter)
                        break;

                    if (key == ConsoleKey.S)
                    {
                        string message = "Message " + (++messageId).ToString();
                        var body = Encoding.UTF8.GetBytes(message);

                        channel.BasicPublish(exchange: "",
                                             routingKey: "hello",
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine("Sent {0}", message);
                    }
                }
            }
        }
    }
}

TestReceiver.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

public class TestReceiver
{
    public TestReceiver()
    {
    }

    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                // Declare the queue here because we might start the consumer before the publisher 
                // so the queue should exist before we try to consume messages from it.
                channel.QueueDeclare(queue: "Test Queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                Console.WriteLine(" Press [R] to receive a message, [Enter] to exit.");

                // Register a consumer to listen to a specific queue. 
                channel.BasicConsume(queue: "Test Queue",
                                     autoAck: true,
                                     consumer: consumer);

                ConsoleKey key;
                while (true)
                {
                    key = Console.ReadKey(true).Key;

                    if (key == ConsoleKey.Enter)
                        break;

                    if (key == ConsoleKey.R)
                    {
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine("Received {0}", message);
                        };


                    }
                }
            }
        }
    }
}

我的设置方式是,如果您按

S
键,它会向队列发送一条消息,如果您按
R
键,它会从队列中读取消息。发送有效,但接收完全没有任何作用。我删除了
while
循环和按键代码,并且接收工作正常。但是,我想弄清楚如何让消息被一条一条地消费,而不是一次全部消费。另外,有谁知道RabbitMQ是否可以将消息持久化在队列中,或者在消费时是否必须出列?

c# .net rabbitmq message-queue messagebroker
2个回答
2
投票

当您确认当前消息时,RabbitMQ 会向消费者/订阅者发送一条新消息。 RabbitMQ 会将消息保留在队列中(仅限持久模式),直到未被确认为止。

从channel.BasicConsume函数中删除

autoAck: true选项。使用channel.BasicAck函数在按下R键时确认消息。


0
投票
为了确保您的消费者一条一条地处理消息,即使队列中有多条消息,您也可以调整预取计数并管理确认过程。

//Set the prefetch count to 1 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
修改您的消费者代码以包含预取计数设置并处理确认:

// Register a consumer to listen to a specific queue. channel.BasicConsume(queue: "Test Queue", autoAck: false, consumer: consumer); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine("Received {0}", message); // Acknowledge the message channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); };
channel.BasicAck 调用在处理后确认消息。

© www.soinside.com 2019 - 2024. All rights reserved.