我正在将 MassTransit 用于带有 .net6 的 RabbitMQ 函数。
我使用下面的代码来发送/订阅消息。
但是我发现当我从 RabbitMQ 管理网络以消费者格式发布消息时。
所有消费者都会被触发,而不是只触发满足有效负载格式的消费者。
我知道如果我将消息包裹在信封中可以解决问题。
但由于某种原因,我不想使用该格式,因为它包含太多我不想记录的消息。
还有其他方法可以解决这个问题吗?
例如,使用 header 来区分哪个消费者应该接收消息?
日志将是:
2024-08-27 11:47:50.3704 INFO [ProductConsumer] Message received context: {"Message":{"Code":"_code","Name":"_name"}}
2024-08-27 11:47:50.4210 INFO [NameConsumer] Message received context: {"Message":{"Name":null}}
这是我的代码:
程序.cs
services.AddMassTransit(x =>
{
x.AddConsumer<ProductConsumer>();
x.AddConsumer<NameConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host($"myRabbitHost.com", 5672, "VHOST" , configurator =>
{
configurator.Username("_user");
configurator.Password("_pwd");
});
cfg.ReceiveEndpoint("AUTO.QUEUENAME", e =>
{
e.UseRawJsonSerializer();
e.ConfigureConsumer<ProductConsumer>(context);
e.ConfigureConsumer<NameConsumer>(context);
});
cfg.DefaultContentType = new ContentType("application/json");
cfg.UseRawJsonDeserializer();
cfg.ConfigureEndpoints(context);
});
});
名称消费者.cs
public class NameConsumer: IConsumer<NameMessage>
{
private readonly ILogger<NameConsumer> _logger;
public NameConsumer(ILogger<NameConsumer> logger)
{
_logger=logger;
}
public Task Consume(ConsumeContext<NameMessage> context)
{
_logger.LogInformation(" [NameConsumer] Message received context: {conetxt} ",JsonSerializer.Serialize(context));
return Task.CompletedTask;
}
}
public interface NameMessage
{
public string Name { get; set; }
}
产品消费者.cs
public class ProductConsumer: IConsumer<ProductMessage>
{
private readonly ILogger<ProductConsumer> _logger;
public ProductConsumer(ILogger<ProductConsumer> logger)
{
_logger=logger;
}
public Task Consume(ConsumeContext<ProductMessage> context)
{
_logger.LogInformation(" [ProductConsumer] Message received context: {conetxt} ",JsonSerializer.Serialize(context));
return Task.CompletedTask;
}
}
public interface ProductMessage
{
public string Code { get; set; }
public string Name { get; set; }
}
您可以简单地设置一个条件来检查标题。
public Task Consume(ConsumeContext<NameMessage> context)
{
if (context.Headers.TryGetHeader("ConsumerType", out var headerValue) && headerValue?.ToString() == "TypeA")
{
_logger.LogInformation(" [NameConsumer] Message received context: {conetxt} ", JsonSerializer.Serialize(context));
}
return Task.CompletedTask;
}