我需要将我们的多租户应用程序(使用 finbuckle)连接到 MassTransit/RabbitMQ。
该应用程序正在使用:
两种潜在的方法似乎可行:
如本文文章中所述,此方法反映了多租户数据库设置,其中每个表都有一个
tenantId
列,无需单独的数据库即可隔离租户数据。
优点
缺点
优点
缺点
任何建议或代码示例将不胜感激!
我通过为每个租户发出一个 IBus 扩展接口来解决这个问题。然后,我为每个发出的接口调用 AddMassTransit 方法。
internal class Program
{
static void Main(string[] args)
{
var builder = Host.CreateDefaultBuilder(args);
var config = new ConfigurationBuilder().AddJsonFile("appsettings.json", false).Build();
MassTransitConfig massTransitConfig = config.GetSection("MassTransitConfig").Get<MassTransitConfig>();
builder.ConfigureServices((hostContext, services) =>
{
BusControlFactory busControlFactory = new BusControlFactory(new MassTransitConfigDataProvider(massTransitConfig), services, [typeof(NotificationCreatedConsumer1).Assembly]);
busControlFactory.Initialize();
services.AddSingleton(busControlFactory);
});
var app = builder.Build();
// USAGE:
BusControlSupport busControlSupport = new BusControlSupport(app.Services.GetRequiredService<BusControlFactory>(), app.Services);
IPublishEndpoint publishEndpoint = busControlSupport.GetPublishEndpoint("tenant1");
Console.WriteLine(publishEndpoint);
IClientFactory clientFactory = busControlSupport.GetClientFactory("tenant2");
IRequestClient<ICreateOrder> requestClient = clientFactory.CreateRequestClient<ICreateOrder>();
Console.WriteLine(requestClient);
app.Run();
}
}
/// <summary>
/// Gets MassTransit objects belonging to tenant
/// </summary>
public class BusControlSupport
{
private readonly BusControlFactory busControlFactory;
private readonly IServiceProvider serviceProvider;
public BusControlSupport(BusControlFactory busControlFactory, IServiceProvider serviceProvider)
{
this.busControlFactory = busControlFactory;
this.serviceProvider = serviceProvider;
}
public IPublishEndpoint GetPublishEndpoint(string tenantIdentifier)
{
Type openType = typeof(Bind<,>);
Type[] typeParameters = [busControlFactory.BusTypes[tenantIdentifier], typeof(IPublishEndpoint)];
Type closedType = openType.MakeGenericType(typeParameters);
//Bind<IBus, IPublishEndpoint> publishEndpointBind = (Bind<IBus, IPublishEndpoint>)serviceProvider.GetService(closedType);
object publishEndpointBind = serviceProvider.GetService(closedType);
return (IPublishEndpoint)publishEndpointBind.GetType().GetProperty("Value");
}
public IClientFactory GetClientFactory(string tenantIdentifier)
{
Type openType = typeof(Bind<,>);
Type[] typeParameters = [busControlFactory.BusTypes[tenantIdentifier], typeof(IClientFactory)];
Type closedType = openType.MakeGenericType(typeParameters);
//Bind<IBus, IPublishEndpoint> publishEndpointBind = (Bind<IBus, IPublishEndpoint>)serviceProvider.GetService(closedType);
object clientFactoryBind = serviceProvider.GetService(closedType);
return (IClientFactory)clientFactoryBind.GetType().GetProperty("Value");
}
}
/// <summary>
/// Initializes one MassTransit IBus per tenant
/// </summary>
public class BusControlFactory
{
private readonly IMassTransitConfigDataProvider massTransitConfigDataProvider;
private readonly IServiceCollection services;
private readonly Assembly[] consumerAssemblies;
internal IDictionary<string, Type> BusTypes { get; } = new Dictionary<string, Type>();
public BusControlFactory(IMassTransitConfigDataProvider massTransitConfigDataProvider, IServiceCollection services, Assembly[] consumerAssemblies)
{
this.massTransitConfigDataProvider = massTransitConfigDataProvider;
this.services = services;
this.consumerAssemblies = consumerAssemblies;
}
public void Initialize()
{
// get the AddMassTransit<TBus> method using reflection
Type registrationExtensionType = typeof(DependencyInjectionRegistrationExtensions);
IEnumerable<MethodInfo> methodInfos = registrationExtensionType.GetMethods().Where(mi => mi.Name == nameof(DependencyInjectionRegistrationExtensions.AddMassTransit));
MethodInfo? addMassTransitMethodInfo = null;
foreach (MethodInfo methodInfo in methodInfos.Where(mi => mi.IsGenericMethodDefinition && mi.GetGenericArguments().Length == 1))
{
Type genericArgumentType = methodInfo.GetGenericArguments().First();
Type[] genericParameterConstraints = genericArgumentType.GetGenericParameterConstraints();
if (genericParameterConstraints.Length == 1 && genericParameterConstraints.First().FullName == typeof(IBus).FullName)
{
addMassTransitMethodInfo = methodInfo;
break;
}
}
if (addMassTransitMethodInfo == null)
throw new Exception("addMassTransitMethodInfo == null");
// execute AddMassTransit<TBus> for every tenant
foreach (MassTransitTenantConfigData tenantConfigData in massTransitConfigDataProvider.TenantConfigDatas)
{
Type busType = TenantBusInterfaceGenerator.CreateTenantBusInterface(tenantConfigData.TenantId);
BusTypes[tenantConfigData.TenantId] = busType;
MethodInfo addMassTransitMethod = addMassTransitMethodInfo.MakeGenericMethod(busType);
if (addMassTransitMethod == null)
throw new InvalidOperationException($"Could not create AddMassTransit generic method for {busType.FullName}.");
// Define the configuration action
Action<IBusRegistrationConfigurator> configureAction =
busConfigurator =>
{
busConfigurator.SetKebabCaseEndpointNameFormatter();
busConfigurator.AddConsumers(consumerAssemblies);
busConfigurator.UsingRabbitMq((context, busFactoryConfigurator) =>
{
busFactoryConfigurator.Host(tenantConfigData.Uri, hostConfigurator =>
{
hostConfigurator.Username(tenantConfigData.Username);
hostConfigurator.Password(tenantConfigData.Password);
});
busFactoryConfigurator.ConfigureEndpoints(context);
});
};
// invoke AddMassTransit<busType>
addMassTransitMethod.Invoke(null, [services, configureAction]);
}
}
}
/// <summary>
/// Emits interface that extends IBus interface
/// </summary>
public static class TenantBusInterfaceGenerator
{
private static readonly AssemblyBuilder AssemblyBuilder = AssemblyBuilder.DefineDynamicAssembly(new AssemblyName("DynamicTenantBuses"), AssemblyBuilderAccess.Run);
private static readonly ModuleBuilder ModuleBuilder = AssemblyBuilder.DefineDynamicModule("MainModule");
public static Type CreateTenantBusInterface(string tenantName)
{
string interfaceName = $"I{ToValidClassName(tenantName)}Bus";
TypeBuilder typeBuilder = ModuleBuilder.DefineType(interfaceName, TypeAttributes.Public | TypeAttributes.Interface | TypeAttributes.Abstract);
typeBuilder.AddInterfaceImplementation(typeof(IBus));
return typeBuilder.CreateTypeInfo().AsType();
}
private static string ToValidClassName(string input)
{
if (string.IsNullOrWhiteSpace(input))
throw new ArgumentException("Input cannot be null or whitespace.", input);
string cleaned = Regex.Replace(input, @"[^A-Za-z0-9\s]", "");
TextInfo textInfo = CultureInfo.InvariantCulture.TextInfo;
string pascalCaseName = textInfo.ToTitleCase(cleaned).Replace(" ", "");
if (char.IsDigit(pascalCaseName[0]))
pascalCaseName = "_" + pascalCaseName;
return pascalCaseName;
}
}