我尝试通过定时后台任务设置Rabbitmq使用者,但是,它并不总是正确地使用消息。 它似乎是从队列中获取消息,但是没有正确记录接收到的消息。这似乎很奇怪。我想这与事件处理程序接收到消息的方式不正确有关,但是我只是不知道为什么。
我执行Background tasks with hosted services in ASP.NET Core中讨论的定时后台任务,因为我想避免连接寿命长。我觉得在需要时创建新的连接来排队更安全。
// consumer part
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace play_mq
public class Program
public static void Main(string[] args)
public static IHostBuilder CreateHostBuilder(string[] args) =>
.ConfigureLogging(logging =>
.ConfigureServices(services =>
public class MessageListener : IHostedService, IDisposable
private readonly IOptions<RabbitOptions> _rabbitOptions;
private readonly ILogger<MessageListener> _logger;
private Timer _timer;
private int executionCount = 0;
public MessageListener(ILogger<MessageListener> logger, IOptions<RabbitOptions> rabbitOptions)
_logger = logger;
_rabbitOptions = rabbitOptions;
public Task StartAsync(CancellationToken cancellationToken)
_timer = new Timer(DoWork, null, TimeSpan.Zero,
return Task.CompletedTask;
private void DoWork(object state)
var count = Interlocked.Increment(ref executionCount);
"MessageListener is working. Count: {Count}", count);
var factory = new ConnectionFactory()
HostName = "localhost"
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
_logger.LogInformation($"consume {message}");
channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
public Task StopAsync(CancellationToken cancellationToken)
_logger.LogInformation("MessageListener is stopping.");
_timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
public void Dispose()
// Sender comes from official example
using System;
using RabbitMQ.Client;
using System.Text;
namespace ConsoleMq
class Send
public static void Main()
var factory = new ConnectionFactory() { HostName = "localhost", Port = 14000 };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);
Console.WriteLine(" Press [enter] to exit.");
channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
consumer.Received += (model, ea) =>
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
//Other method available for negative ack
public class MessageListener : IHostedService, IDisposable
private readonly IOptions<RMQConfig> _rabbitOptions;
private readonly ILogger<MessageListener> _logger;
private Timer _timer;
private int executionCount = 0;
private readonly IConnection _connection;
public MessageListener(ILogger<MessageListener> logger, IOptions<RMQConfig> rabbitOptions)
_logger = logger;
_rabbitOptions = rabbitOptions;
var factory = new ConnectionFactory()
HostName = "localhost"
_connection = factory.CreateConnection();
public Task StartAsync(CancellationToken cancellationToken)
_timer = new Timer(DoWork, null, TimeSpan.Zero,
return Task.CompletedTask;
private void DoWork(object state)
var count = Interlocked.Increment(ref executionCount);
"MessageListener is working. Count: {Count}", count);
// using (var connection = factory.CreateConnection())
// using (var channel = connection.CreateModel())
// {
var channel = _connection.CreateModel();
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
_logger.LogInformation($"consume {message}");
channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
// }
public Task StopAsync(CancellationToken cancellationToken)
_logger.LogInformation("MessageListener is stopping.");
_timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
public void Dispose()
//Close _connection