我在多租户应用程序中将 MassTransit 与 RabbitMQ 一起使用。我使用 IFilter 实现了自定义发件箱模式,该模式应该只将消息保存在我的数据库中,但我发现它还会将消息发送到 RabbitMQ。
我的理解是,您应该显式调用 await next.Send(context); 来继续管道,但无论如何它都会发送消息。
这里有一个简化的应用程序,显示了行为:消息已发送到队列,但我希望它没有发送到队列。 如何避免发送该消息?
using MassTransit;
namespace Test;
public class Program
{
public static async Task Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
IServiceCollection services = builder.Services;
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
// To intercept sending
cfg.UseSendFilter(typeof(OutboxPreSendFilter<>), context);
cfg.Host("localhost", 5672, "/", h =>
{
h.Username("test");
h.Password("test");
});
});
});
var app = builder.Build();
using var scope = app.Services.CreateScope();
var sendEndpointProvider = scope.ServiceProvider.GetRequiredService<ISendEndpointProvider>();
// Send message to RabbitMQ
await SendAndReceiveFromQueue(sendEndpointProvider);
app.Run();
}
public static async Task SendAndReceiveFromQueue(ISendEndpointProvider sendEndpointProvider)
{
var messageToSent = Guid.NewGuid().ToString();
var sendEndpoint = await sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{nameof(TestMsg)}"));
// Send message to RabbitMQ
await sendEndpoint.Send(new TestMsg() { Id = 1, Name = messageToSent, Description = $"{DateTime.Now:f}" });
}
}
public class OutboxPreSendFilter<T> : IFilter<SendContext<T>> where T : class
{
public async Task Send(SendContext<T> context, IPipe<SendContext<T>> next)
{
// I do stuff in here in my app
// I don't want to send anything, so next line is commented !!!
// await next.Send(context);
}
public void Probe(ProbeContext context)
{
context.CreateFilterScope("outboxPreSendFilter");
}
}
public class TestMsg
{
public int Id { get; set; }
public string? Name { get; set; }
public string? Description { get; set; }
}
发送管道不是发送控制流路径的一部分,与发布相同。