我使用RabbitMQ点网库在C#中的两个队列上有两个使用者。
我想要的是:
由于某些业务逻辑,我不得不在一个消费者中等待一段时间所以我已经为此目的使用了Thread.Sleep()
问题
如果在一个事件中使用Thread.Sleep
,第二个线程也不会暂停
我的代码:
consumer.Received += (model, ea) =>
{
try
{
DRModel drModel = JsonConvert.DeserializeObject<DRModel>(Encoding.UTF8.GetString(ea.Body));
RMQReturnType type = ProcessSubmitSMS(drModel);
if (type == RMQReturnType.ACK)
channel.BasicAck(ea.DeliveryTag, false);
else
{
channel.BasicNack(ea.DeliveryTag, false, true);
Thread.Sleep(300000); // <=== SLEEP
}
}
catch (Exception ex)
{
channel.BasicNack(ea.DeliveryTag, false, true);
WriteLog(ControlChoice.ListError, "Exception: " + ex.Message + " | Stack Trace: " + ex.StackTrace.ToString() + " | [Consumer Event]");
}
};
Mutex类似乎是一个好例子,您需要的是多线程中的条件睡眠。不完全知道您需要的逻辑,但是您的代码如下:
public class Consumer
{
public event EventHandler Received;
public virtual void OnReceived()
{
Received?.Invoke(this, EventArgs.Empty);
}
}
class Program
{
static void Main(string[] args)
{
var mutex = new Mutex();
var consumer = new Consumer();
consumer.Received += (model, ea) =>
{
try
{
mutex.WaitOne();
var id = Guid.NewGuid().ToString();
Console.WriteLine($"Start mutex {id}");
Console.WriteLine($"Mutex finished {id}");
Console.WriteLine($"Start sleep {id}");
if ( new Random().Next(10000) % 2 == 0)
{
Thread.Sleep(3000); // <=== SLEEP
}
Console.WriteLine($"Sleep finished {id}");
}
catch (Exception ex)
{
mutex.ReleaseMutex(); // this is where you release, if something goes wrong
}
finally
{
mutex.ReleaseMutex();
}
};
Parallel.For(0, 10, t =>
{
consumer.OnReceived();
});
Console.ReadLine();
}
}
}