通过定时后台任务在dotnet核心中设置Rabbitmq使用者

问题描述 投票:0回答:1

我尝试通过定时后台任务设置Rabbitmq使用者,但是,它并不总是正确地使用消息。 它似乎是从队列中获取消息,但是没有正确记录接收到的消息。这似乎很奇怪。我想这与事件处理程序接收到消息的方式不正确有关,但是我只是不知道为什么。

我执行Background tasks with hosted services in ASP.NET Core中讨论的定时后台任务,因为我想避免连接寿命长。我觉得在需要时创建新的连接来排队更安全。

但是,到目前为止,使用具有长期连接的后台服务时,到目前为止一切正常。

有人可以告诉我以下代码有什么问题吗?

// consumer part
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace play_mq
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureLogging(logging =>
                {
                    logging.AddConsole();
                })
                .ConfigureServices(services =>
                {
                    services.AddHostedService<MessageListener>();
                });
    }

    public class MessageListener : IHostedService, IDisposable
    {
        private readonly IOptions<RabbitOptions> _rabbitOptions;
        private readonly ILogger<MessageListener> _logger;
        private Timer _timer;
        private int executionCount = 0;

        public MessageListener(ILogger<MessageListener> logger, IOptions<RabbitOptions> rabbitOptions)
        {
            _logger = logger;
            _rabbitOptions = rabbitOptions;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _timer = new Timer(DoWork, null, TimeSpan.Zero,
                TimeSpan.FromSeconds(5));

            return Task.CompletedTask;

        }

        private void DoWork(object state)
        {
            var count = Interlocked.Increment(ref executionCount);

            _logger.LogInformation(
                "MessageListener is working. Count: {Count}", count);

            var factory = new ConnectionFactory()
            {
                HostName = "localhost"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    _logger.LogInformation($"consume {message}");
                };
                channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("MessageListener is stopping.");

            _timer?.Change(Timeout.Infinite, 0);

            return Task.CompletedTask;
        }

        public void Dispose()
        {
            _timer?.Dispose();
        }
    }

}
// Sender comes from official example
using System;
using RabbitMQ.Client;
using System.Text;

namespace ConsoleMq
{
    class Send
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost", Port = 14000 };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);

                string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
                Console.WriteLine(" [x] Sent {0}", message);
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}
c# asp.net-core .net-core rabbitmq
1个回答
0
投票

似乎从队列中获取消息,但是没有正确记录接收到的消息。

原因:

channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);

autoAck设置为true,它在收到消息之前对代理确认。确认后,经纪人将其从队列中删除+

解决方案:而不是在此处设置确认,而是在收到消息时确认]

consumer.Received += (model, ea) =>
            {
               channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
               //Other method available for negative ack
            };

我想避免建立长期连接。我认为在需要时创建新的?connections进行排队更安全。

为了更好地回答您的问题,您可以检查MassTransit(RabbitMQ包装器)代码。

在发布时接收消息

public class MessageListener : IHostedService, IDisposable
    {
        private readonly IOptions<RMQConfig> _rabbitOptions;
        private readonly ILogger<MessageListener> _logger;
        private Timer _timer;
        private int executionCount = 0;
        private readonly IConnection _connection;

        public MessageListener(ILogger<MessageListener> logger, IOptions<RMQConfig> rabbitOptions)
        {
            _logger = logger;
            _rabbitOptions = rabbitOptions;
            var factory = new ConnectionFactory()
            {
                HostName = "localhost"
            };
            _connection = factory.CreateConnection();
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _timer = new Timer(DoWork, null, TimeSpan.Zero,
                TimeSpan.FromSeconds(5));

            return Task.CompletedTask;

        }

        private void DoWork(object state)
        {
            var count = Interlocked.Increment(ref executionCount);

            _logger.LogInformation(
                "MessageListener is working. Count: {Count}", count);



            // using (var connection = factory.CreateConnection())
            // using (var channel = connection.CreateModel())
            // { 
            var channel = _connection.CreateModel();

            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                _logger.LogInformation($"consume {message}");
            };
            channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
            // }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("MessageListener is stopping.");

            _timer?.Change(Timeout.Infinite, 0);

            return Task.CompletedTask;
        }

        public void Dispose()
        {
            //Close _connection
            _timer?.Dispose();
        }
    }

几个月前,我遇到了同样的问题,希望对其他人有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.