我正在尝试处理 MassTransit,我有许多带有 StableDefussion 的服务器,它们接受在队列中生成图像的任务
但事实是不同的服务器有不同的模型集,我不想为每个模型创建很多队列
我的 GPU 服务器如何从队列中过滤任务并仅将那些可以执行的任务投入工作?
我的发布服务器
internal class Program {
static async Task Main(string[] args) {
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => {
cfg.Host(new Uri(ServerConfig.Host), h => {
h.Username(ServerConfig.Username);
h.Password(ServerConfig.Password);
});
cfg.ReceiveEndpoint("calculation_results_queue", e => {
e.Consumer(() => new ResultConsumer());
});
});
await busControl.StartAsync();
Console.WriteLine("Publisher is running...");
await busControl.Publish<CalculationTask>(new {
TaskId = NewId.NextGuid().ToString(),
Data = "Important data for calculation"
}, context => {
context.Headers.Set("model", "model_comic");
});
Console.WriteLine("Press any key to exit");
await Task.Run(() => Console.ReadKey());
await busControl.StopAsync();
}
}
我的 GPU 工作者
internal class Program {
static async Task Main(string[] args) {
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => {
cfg.Host(new Uri(ServerConfig.Host), h => {
h.Username(ServerConfig.Username);
h.Password(ServerConfig.Password);
});
cfg.ReceiveEndpoint("calculation_task_queue", e => {
e.Bind("headers_exchange", x => {
x.ExchangeType = "headers";
x.SetExchangeArgument("x-match", "any");
x.SetExchangeArgument("model", "model_real");
x.SetExchangeArgument("model", "model_anime");
});
e.Consumer(() => new CalculationTaskConsumer());
});
});
await busControl.StartAsync();
Console.WriteLine("Press any key to exit");
await Task.Run(() => Console.ReadKey());
await busControl.StopAsync();
}
}
我的工作人员不应该接收消息,但他还是这么做了。
cfg.ReceiveEndpoint("calculation_task_queue", e => {
e.ConfigureConsumeTopology = false;
e.Bind("headers_exchange", x => {
x.ExchangeType = "headers";
x.SetExchangeArgument("x-match", "any");
x.SetExchangeArgument("model", "model_real");
x.SetExchangeArgument("model", "model_anime");
});
e.Consumer(() => new CalculationTaskConsumer());
});
默认拓扑配置可能是原因,但您可以通过查看 RabbitMQ 管理控制台并查看附加绑定来找出答案。