我们有一些队列,具有相同数量的错误队列。
我的老板让我写一个Window Service来消费错误队列,并将数据传输到DB。 Windows 服务应该能够根据数据库表中设置的配置来管理应该使用哪些队列,这意味着,例如,如果我不想再使用特定队列, 我必须能够在数据库上禁用该特定队列的配置,而无需停止窗口服务。
我计划在 Quartz 的帮助下编写一个 Windows 服务作业计划,每个计划的作业都应该读取数据库配置,打开一个连接, 启动要消耗的所有队列的任务,每个任务使用单个通道消耗队列。我希望通过这个解决方案来解决,因为任务结束时通道将关闭, 连接将被关闭,错误队列将被消耗,并且在下一个作业计划中要读取的队列的数量和名称可能会有所不同。 此外,在每个计划中消耗每个队列的所有消息应该节省打开/关闭连接/通道,如果我每个计划消耗一条消息,这可能会很繁重, 因此,调度的时间应该足以在下一个调度之前消耗队列中的所有错误消息。 此外,调度队列将使我有机会配置要在数据库中使用的队列,而无需停止 Windows 服务。
现在我写了一些代码来测试消费单个队列的解决方案
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueParName, autoAck: false, consumer: consumer);
bool queueEmpty = false;
while (!queueEmpty)
{
try
BasicDeliverEventArgs result;
bool bRead = consumer.Queue.Dequeue(timeOutQueueEmpty, out result);
if (bRead)
{
var msgBody = Encoding.UTF8.GetString(result.Body);
// TO DB ...
}
else
{
queueEmpty = true;
}
}
catch (EndOfStreamException ex)
{
// ...
}
}
问题是 QueueingBasicConsumer 已过时,并且在许多地方都是为了避免担心 EventBasicConsumer
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
// TO DB...
};
channel.BasicConsume(queue: queueParName, autoAck: true, consumer: consumer);
但是使用 EventBasicConsumer 我无法理解如何在队列为空时停止消费以关闭连接和通道
编辑
按照评论中的要求进行一些更深入的解释。 把所有代码放在这里会很困难,因为我们使用了很多我们公司的编译库,所以代码不会完全理解。 无论如何,简化一下:
...
var schedule = SimpleScheduleBuilder.Create();
schedule.WithIntervalInSeconds(ConfigurationManager.AppSettings["..."]);
schedule.RepeatForever();
s.ScheduleQuartzJob(q =>
q.WithJob(() =>
JobBuilder.Create<RabbitErrorDequeuerJob>().Build())
.AddTrigger(() =>
TriggerBuilder.Create()
.WithSchedule(schedule)
.Build())
);
...
有 Ninject IOC,并在模块文件中注入连接
// *** Ninject disposes every Disposable object that has another scope other than InTransientScope
Bind<IConnection>().ToMethod(x =>
{
IConnectionFactory cnf = new ConnectionFactory();
cnf.Uri = new Uri(ConfigurationManager.AppSettings["..."]);
return cnf.CreateConnection();
}).InCallScope();
我有一个Job项目,由Quartz每隔N次调度一次,其中我有Execute方法(Quartz.IJob接口)
public void Execute(IJobExecutionContext context)
{
try
{
List<RabbitQueueConfiguration> lst = //...LIST OF QUEUTE TO DEQUE FROM DATABASE
foreach (RabbitQueueConfiguration queue in lst)
{
Task t = Task.Factory.StartNew(() =>
{
DequeuSingleQueue(queue);
});
}
}
catch (Exception ex)
{
_log.FatalFormat("Error ", ex.Message);
throw;
}
}
DequeuSingleQueue(queue)里面有出队的核心
BasicGet
一次处理一条消息。