RabbitMQ消费者:已关闭异常。

问题描述 投票:0回答:1

我有一个简单的RabbitMQ发布者和消费者代码,如下所示。

首先,我在My_Tasks队列中创建了10条不同数量的消息。当我尝试逐一获取这些消息,并将autoAck标志设为false时,我可以读取第一条消息,但无法向RabbitMQ服务器发送确认。我得到一个写在下面的错误。

Publisher;

var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(qName, durable: true, false, false, null);

        var body = Encoding.UTF8.GetBytes(message);

        var prop = channel.CreateBasicProperties();
        prop.Persistent = true;

        channel.BasicPublish("", routingKey: qName, prop, body);
    }
}

Consumer.Consumer.RabbitMQ.Client.Exception

var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(qName, durable: true, false, false, null);
        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        var consumer = new EventingBasicConsumer(channel);
        channel.BasicConsume(qName, autoAck: false, consumer);

        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            channel.BasicAck(ea.DeliveryTag, multiple: false);
        };
    }
}

RabbitMQ.Client.Exceptions.AlreadyClosedException: 'Already closed: AMQP操作被中断。AMQP关闭原因,由Application发起,code=200,text='Goodbye',classId=0,methodId=0

在 RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd) 在 RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory 1 body) 在 RabbitMQ.Client.Framing.Impl.Model. BasicAck(UInt64 deliveryTag, Boolean multiple) 在RabbitMQ.Client.Impl.RecoveryAwareModel.BasicAck(UInt64 deliveryTag, Boolean multiple) 在QueueExample.Consumer. Program.<>c__DisplayClass0_0.b__0(Object model,BasicDeliverEventArgs ea)在D:Projects/\RabbitMQTutorial/QueueExample/QueueExample.Consumer/Program.cs:第36行,在RabbitMQ.Client.Events.EventingBasicConsumer. HandleBasicDeliver(String consumerTag, UInt64 deliveryTag, Boolean redelivered, String exchange, String routingKey, IBasicProperties properties, ReadOnlyMemory`1 body) at RabbitMQ.Client.Impl.ConcurrentConsumerDispatcher.<>c__DisplayClass10_0.b__0()

谢谢你的帮助

.net-core rabbitmq consumer
1个回答
0
投票

这应该是一个评论,但我缺少信誉。

你的解决方案已经成功了吗?你在哪里定义你的连接到经纪人?你是在docker-compose配置中使用代理吗?在这种情况下,你的broker必须连接到

rabbitmq://host.docker.internal

当我开始在.Net Core中使用RabbitMQ时,我也遇到了同样的问题.由于我的应用程序(微服务模式)处于早期阶段,我改用了 大众交通框架 它可以处理一切。我可以强烈推荐它,因为它 "只是工作",你可以完全专注于你想要的系统功能。


0
投票

我找到了错误的答案。我的示例应用程序是一个控制台应用程序,所以当消费者收到消息时,连接已经关闭。当写一个Connsole.ReadLine()来等待关闭连接的响应时,所有的消息都可以被消费者读取。

这是暂时的解决方案,你可以从这个角度找到自己的解决方案。

var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(qName, durable: true, false, false, null);
        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        var consumer = new EventingBasicConsumer(channel);
        channel.BasicConsume(qName, autoAck: false, consumer);

        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            channel.BasicAck(ea.DeliveryTag, multiple: false);
        };

        //Solution is here
        Console.WriteLine("Press Any Key to Continue..");
        Console.ReadLine();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.