我有一个简单的RabbitMQ发布者和消费者代码,如下所示。
首先,我在My_Tasks队列中创建了10条不同数量的消息。当我尝试逐一获取这些消息,并将autoAck标志设为false时,我可以读取第一条消息,但无法向RabbitMQ服务器发送确认。我得到一个写在下面的错误。
Publisher;
var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(qName, durable: true, false, false, null);
var body = Encoding.UTF8.GetBytes(message);
var prop = channel.CreateBasicProperties();
prop.Persistent = true;
channel.BasicPublish("", routingKey: qName, prop, body);
}
}
Consumer.Consumer.RabbitMQ.Client.Exception
var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(qName, durable: true, false, false, null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(qName, autoAck: false, consumer);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
channel.BasicAck(ea.DeliveryTag, multiple: false);
};
}
}
RabbitMQ.Client.Exceptions.AlreadyClosedException: 'Already closed: AMQP操作被中断。AMQP关闭原因,由Application发起,code=200,text='Goodbye',classId=0,methodId=0
在 RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd) 在 RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory 1 body) 在 RabbitMQ.Client.Framing.Impl.Model. BasicAck(UInt64 deliveryTag, Boolean multiple) 在RabbitMQ.Client.Impl.RecoveryAwareModel.BasicAck(UInt64 deliveryTag, Boolean multiple) 在QueueExample.Consumer. Program.<>c__DisplayClass0_0.b__0(Object model,BasicDeliverEventArgs ea)在D:Projects/\RabbitMQTutorial/QueueExample/QueueExample.Consumer/Program.cs:第36行,在RabbitMQ.Client.Events.EventingBasicConsumer. HandleBasicDeliver(String consumerTag, UInt64 deliveryTag, Boolean redelivered, String exchange, String routingKey, IBasicProperties properties, ReadOnlyMemory`1 body) at RabbitMQ.Client.Impl.ConcurrentConsumerDispatcher.<>c__DisplayClass10_0.b__0()
谢谢你的帮助
这应该是一个评论,但我缺少信誉。
你的解决方案已经成功了吗?你在哪里定义你的连接到经纪人?你是在docker-compose配置中使用代理吗?在这种情况下,你的broker必须连接到
rabbitmq://host.docker.internal
当我开始在.Net Core中使用RabbitMQ时,我也遇到了同样的问题.由于我的应用程序(微服务模式)处于早期阶段,我改用了 大众交通框架 它可以处理一切。我可以强烈推荐它,因为它 "只是工作",你可以完全专注于你想要的系统功能。
我找到了错误的答案。我的示例应用程序是一个控制台应用程序,所以当消费者收到消息时,连接已经关闭。当写一个Connsole.ReadLine()来等待关闭连接的响应时,所有的消息都可以被消费者读取。
这是暂时的解决方案,你可以从这个角度找到自己的解决方案。
var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(qName, durable: true, false, false, null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(qName, autoAck: false, consumer);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
channel.BasicAck(ea.DeliveryTag, multiple: false);
};
//Solution is here
Console.WriteLine("Press Any Key to Continue..");
Console.ReadLine();
}
}