我尝试通过定时后台任务设置Rabbitmq使用者,但是,它并不总是正确地使用消息。 它似乎是从队列中获取消息,但是没有正确记录接收到的消息。这似乎很奇怪。我想这与事件处理程序接收到消息的方式不正确有关,但是我只是不知道为什么。
我执行Background tasks with hosted services in ASP.NET Core中讨论的定时后台任务,因为我想避免连接寿命长。我觉得在需要时创建新的连接来排队更安全。
但是,到目前为止,使用具有长期连接的后台服务时,到目前为止一切正常。
有人可以告诉我以下代码有什么问题吗?
// consumer part
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace play_mq
{
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureLogging(logging =>
{
logging.AddConsole();
})
.ConfigureServices(services =>
{
services.AddHostedService<MessageListener>();
});
}
public class MessageListener : IHostedService, IDisposable
{
private readonly IOptions<RabbitOptions> _rabbitOptions;
private readonly ILogger<MessageListener> _logger;
private Timer _timer;
private int executionCount = 0;
public MessageListener(ILogger<MessageListener> logger, IOptions<RabbitOptions> rabbitOptions)
{
_logger = logger;
_rabbitOptions = rabbitOptions;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_timer = new Timer(DoWork, null, TimeSpan.Zero,
TimeSpan.FromSeconds(5));
return Task.CompletedTask;
}
private void DoWork(object state)
{
var count = Interlocked.Increment(ref executionCount);
_logger.LogInformation(
"MessageListener is working. Count: {Count}", count);
var factory = new ConnectionFactory()
{
HostName = "localhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
_logger.LogInformation($"consume {message}");
};
channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("MessageListener is stopping.");
_timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
}
public void Dispose()
{
_timer?.Dispose();
}
}
}
// Sender comes from official example
using System;
using RabbitMQ.Client;
using System.Text;
namespace ConsoleMq
{
class Send
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost", Port = 14000 };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
似乎从队列中获取消息,但是没有正确记录接收到的消息。
原因:
channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
autoAck设置为true,它在收到消息之前对代理确认。确认后,经纪人将其从队列中删除+
解决方案:而不是在此处设置确认,而是在收到消息时确认]
consumer.Received += (model, ea) =>
{
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
//Other method available for negative ack
};
我想避免建立长期连接。我认为在需要时创建新的?connections进行排队更安全。
为了更好地回答您的问题,您可以检查MassTransit(RabbitMQ包装器)代码。
和在发布时接收消息
public class MessageListener : IHostedService, IDisposable
{
private readonly IOptions<RMQConfig> _rabbitOptions;
private readonly ILogger<MessageListener> _logger;
private Timer _timer;
private int executionCount = 0;
private readonly IConnection _connection;
public MessageListener(ILogger<MessageListener> logger, IOptions<RMQConfig> rabbitOptions)
{
_logger = logger;
_rabbitOptions = rabbitOptions;
var factory = new ConnectionFactory()
{
HostName = "localhost"
};
_connection = factory.CreateConnection();
}
public Task StartAsync(CancellationToken cancellationToken)
{
_timer = new Timer(DoWork, null, TimeSpan.Zero,
TimeSpan.FromSeconds(5));
return Task.CompletedTask;
}
private void DoWork(object state)
{
var count = Interlocked.Increment(ref executionCount);
_logger.LogInformation(
"MessageListener is working. Count: {Count}", count);
// using (var connection = factory.CreateConnection())
// using (var channel = connection.CreateModel())
// {
var channel = _connection.CreateModel();
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
_logger.LogInformation($"consume {message}");
};
channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
// }
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("MessageListener is stopping.");
_timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
}
public void Dispose()
{
//Close _connection
_timer?.Dispose();
}
}
几个月前,我遇到了同样的问题,希望对其他人有帮助。