RABBITMQ C#在控制台上运行良好,但不适用于服务

问题描述 投票:0回答:1

我学习如何通过https://www.rabbitmq.com/dotnet-api-guide.html或同一网站上的教程使用RabbitMQ的消息。

在控制台应用程序上很好用!我在做什么?我只使用消息,然后发布退货,以便队列中的消息完成并删除。没关系!现在,我正在尝试制作一个多线程Windows服务来执行此操作,因为我需要一种更好,更快的方式来消耗队列中的许多消息。

我发现很难找到信息来创建服务,所以我决定问您是否可以帮助我

首先,有一种我不明白的奇怪情况。让我们在下面看到消费消息的基本结构:

public static void Main()
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using(var connection = factory.CreateConnection())
    using(var channel = connection.CreateModel())
    {
        channel.QueueDeclare(queue: "task_queue",durable: true,exclusive: false,autoDelete: false, arguments: null);

        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        Console.WriteLine(" [*] Waiting for messages.");

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine(" [x] Received {0}", message);

            int dots = message.Split('.').Length - 1;
            Thread.Sleep(dots * 1000);

            Console.WriteLine(" [x] Done");

            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };
        channel.BasicConsume(queue: "task_queue",autoAck: false,consumer: consumer);

        Console.WriteLine(" Press [enter] to exit.");
    =>> Console.ReadLine(); <<=
    }
}

请参见“ Console.ReadLine();”?在控制台应用程序上,此命令可防止连接和通道配置!没有此RabbitMQ会返回错误消息! (代码= 200,文本=“再见”)!但没关系,我不会在控制台应用程序上删除它!但是如何在Windows服务上使用它?

所以我找到了一个解决方案HERE,我可以提供一项使用消息的服务,但仅适用于1或2条消息。我有使用消息的线程,但是如果队列中有很多消息,则使用者(线程)会尝试使用同一条消息,或者不接收任何消息,从而导致错误。我尝试了不同的方法来解决此问题,但结果是相同的。我想提供一种可以与控制台应用程序一起运行的服务,但是可以通过多线程方式更快地进行服务。

我要做的是:1-消费信息2-使用消息中的数据执行某些操作3-将返回的消息发布到该消息的队列中,以便消息完成。

我的代码:

public partial class Service1 : ServiceBase
{
    public static ConnectionFactory factory;
    public static string fila = "QUEUE";
    public static int threads = 0;
    private Thread executeThread;


    public Service1()
    {
        InitializeComponent();
    }

    protected override void OnStart(string[] args)
    {
        factory = new ConnectionFactory();
        {
            System.Uri uri = new System.Uri("amqp://ARABBITMQNURL:8080");
            factory.Uri = uri;
        };

        try
        {
            executeThread = new Thread(new ThreadStart(start));
            executeThread.Start();
            File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n starting");

        }
        catch (Exception ex)
        {
            File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n starting error: " + ex.Message);
        }

    }

    private static void start()
    {
        while (true)
        {

            ThreadStart go = new ThreadStart(ServerRPC);
            Thread T = new Thread(go);
            T.Start();

            Thread.Sleep(1000);

        }
    }

    protected override void OnStop()
    {
        EventLog.WriteEntry("Ivi client stopped", EventLogEntryType.Warning);
    }

    private static void ServerRPC()
    {
        threads++;

        File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n"+ threads);

        IConnection connection = factory.CreateConnection();
        IModel channel = connection.CreateModel();

        channel.QueueDeclare(queue: fila, durable: true, exclusive: false, autoDelete: false, arguments: null);
        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        var consumer = new EventingBasicConsumer(channel);

        ConsumoMSG.Consumer(fila, consumer, channel, connection);

        channel.BasicConsume(queue: fila, autoAck: false, consumer: consumer);

    }

}

public static void Consumer(string fila, EventingBasicConsumer consumer, IModel channel, IConnection connection)
    {
        channel.BasicConsume(queue: fila, autoAck: false, consumer: consumer);

        consumer.Received += (ch, ea) =>
        {
            var body = ea.Body;
            var props = ea.BasicProperties;
            var replyProps = channel.CreateBasicProperties();
            replyProps.CorrelationId = props.CorrelationId;
            var message = Encoding.UTF8.GetString(body);

            File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n " + message);

                Execute.Powershell(); //this start a ps1 that sleep for 5 sec.

                var response = "{\"code\":\"500\",\"response\":\"PowershellReturn\"}";

                var responseBytes = Encoding.UTF8.GetBytes(response);

                try
                {
                    channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
                    channel.BasicAck(ea.DeliveryTag, false);
                    File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n PUBLISH OK!! routingKey: " + props.ReplyTo + "basicProperties: " + replyProps);
                Service1.threads--;
                }
                catch (Exception e)
                {
                    File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n ERROR PUBLISH: " + e.Message);
                    File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n ERROR PUBLISH routingKey: " + props.ReplyTo + "basicProperties: " + replyProps);
                }
            channel.Close();
            connection.Close();

        };
    }
}

我是否可以使用另一种方法?我在做什么错?

谢谢!

c# multithreading service rabbitmq
1个回答
0
投票

此代码作为Windows服务。它使用TopShelf(http://topshelf-project.com)。

请阅读第二行中的注释。

class Program
    {
        //I declare as class fields so they are not garbage collected and the consumer can stay alive
        private EventingBasicConsumer _consumer;
        private IModel _channel;
        private IConnection _connection;

        public bool Stop()
        {
            _channel.Dispose();
            _connection.Dispose();
            return true;
        }

        public bool Start()
        {

            var factory = new ConnectionFactory() {HostName = "localhost"};
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();

            _channel.QueueDeclare(queue: "hello",
                durable: false,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            _consumer = new EventingBasicConsumer(_channel);
            _consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

            };
            _channel.BasicConsume(queue: "hello",
                autoAck: true,
                consumer: _consumer);

            return true;
        }


        public static void Main()
        {

            var rc = HostFactory.Run(x => //1
            {
                x.Service<Program>(s => //2
                {
                    s.ConstructUsing(name => new Program()); //3
                    s.WhenStarted(tc => tc.Start()); //4
                    s.WhenStopped(tc => tc.Stop()); //5
                });
                x.RunAsLocalSystem(); //6

                x.SetDescription("Sample Topshelf Host"); //7
                x.SetDisplayName("Stuff"); //8
                x.SetServiceName("Stuff"); //9
            }); //10

            var exitCode = (int) Convert.ChangeType(rc, rc.GetTypeCode()); //11
            Environment.ExitCode = exitCode;
        }
    }
© www.soinside.com 2019 - 2024. All rights reserved.