我学习如何通过https://www.rabbitmq.com/dotnet-api-guide.html或同一网站上的教程使用RabbitMQ的消息。
在控制台应用程序上很好用!我在做什么?我只使用消息,然后发布退货,以便队列中的消息完成并删除。没关系!现在,我正在尝试制作一个多线程Windows服务来执行此操作,因为我需要一种更好,更快的方式来消耗队列中的许多消息。
我发现很难找到信息来创建服务,所以我决定问您是否可以帮助我
首先,有一种我不明白的奇怪情况。让我们在下面看到消费消息的基本结构:
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",durable: true,exclusive: false,autoDelete: false, arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",autoAck: false,consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
=>> Console.ReadLine(); <<=
}
}
请参见“ Console.ReadLine();”?在控制台应用程序上,此命令可防止连接和通道配置!没有此RabbitMQ会返回错误消息! (代码= 200,文本=“再见”)!但没关系,我不会在控制台应用程序上删除它!但是如何在Windows服务上使用它?
所以我找到了一个解决方案HERE,我可以提供一项使用消息的服务,但仅适用于1或2条消息。我有使用消息的线程,但是如果队列中有很多消息,则使用者(线程)会尝试使用同一条消息,或者不接收任何消息,从而导致错误。我尝试了不同的方法来解决此问题,但结果是相同的。我想提供一种可以与控制台应用程序一起运行的服务,但是可以通过多线程方式更快地进行服务。
我要做的是:1-消费信息2-使用消息中的数据执行某些操作3-将返回的消息发布到该消息的队列中,以便消息完成。
我的代码:
public partial class Service1 : ServiceBase
{
public static ConnectionFactory factory;
public static string fila = "QUEUE";
public static int threads = 0;
private Thread executeThread;
public Service1()
{
InitializeComponent();
}
protected override void OnStart(string[] args)
{
factory = new ConnectionFactory();
{
System.Uri uri = new System.Uri("amqp://ARABBITMQNURL:8080");
factory.Uri = uri;
};
try
{
executeThread = new Thread(new ThreadStart(start));
executeThread.Start();
File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n starting");
}
catch (Exception ex)
{
File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n starting error: " + ex.Message);
}
}
private static void start()
{
while (true)
{
ThreadStart go = new ThreadStart(ServerRPC);
Thread T = new Thread(go);
T.Start();
Thread.Sleep(1000);
}
}
protected override void OnStop()
{
EventLog.WriteEntry("Ivi client stopped", EventLogEntryType.Warning);
}
private static void ServerRPC()
{
threads++;
File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n"+ threads);
IConnection connection = factory.CreateConnection();
IModel channel = connection.CreateModel();
channel.QueueDeclare(queue: fila, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
ConsumoMSG.Consumer(fila, consumer, channel, connection);
channel.BasicConsume(queue: fila, autoAck: false, consumer: consumer);
}
}
public static void Consumer(string fila, EventingBasicConsumer consumer, IModel channel, IConnection connection)
{
channel.BasicConsume(queue: fila, autoAck: false, consumer: consumer);
consumer.Received += (ch, ea) =>
{
var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
var message = Encoding.UTF8.GetString(body);
File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n " + message);
Execute.Powershell(); //this start a ps1 that sleep for 5 sec.
var response = "{\"code\":\"500\",\"response\":\"PowershellReturn\"}";
var responseBytes = Encoding.UTF8.GetBytes(response);
try
{
channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
channel.BasicAck(ea.DeliveryTag, false);
File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n PUBLISH OK!! routingKey: " + props.ReplyTo + "basicProperties: " + replyProps);
Service1.threads--;
}
catch (Exception e)
{
File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n ERROR PUBLISH: " + e.Message);
File.AppendAllText(@"C:\Temp\iviservicetesterabbit.txt", "\r\n ERROR PUBLISH routingKey: " + props.ReplyTo + "basicProperties: " + replyProps);
}
channel.Close();
connection.Close();
};
}
}
我是否可以使用另一种方法?我在做什么错?
谢谢!
此代码作为Windows服务。它使用TopShelf(http://topshelf-project.com)。
请阅读第二行中的注释。
class Program
{
//I declare as class fields so they are not garbage collected and the consumer can stay alive
private EventingBasicConsumer _consumer;
private IModel _channel;
private IConnection _connection;
public bool Stop()
{
_channel.Dispose();
_connection.Dispose();
return true;
}
public bool Start()
{
var factory = new ConnectionFactory() {HostName = "localhost"};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
_consumer = new EventingBasicConsumer(_channel);
_consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
};
_channel.BasicConsume(queue: "hello",
autoAck: true,
consumer: _consumer);
return true;
}
public static void Main()
{
var rc = HostFactory.Run(x => //1
{
x.Service<Program>(s => //2
{
s.ConstructUsing(name => new Program()); //3
s.WhenStarted(tc => tc.Start()); //4
s.WhenStopped(tc => tc.Stop()); //5
});
x.RunAsLocalSystem(); //6
x.SetDescription("Sample Topshelf Host"); //7
x.SetDisplayName("Stuff"); //8
x.SetServiceName("Stuff"); //9
}); //10
var exitCode = (int) Convert.ChangeType(rc, rc.GetTypeCode()); //11
Environment.ExitCode = exitCode;
}
}