如何配置 MassTransit 和 RabbitMQ 以实现多租户

问题描述 投票:0回答:1

我需要将我们的多租户应用程序(使用 finbuckle)连接到 MassTransit/RabbitMQ
该应用程序正在使用:

  • Finbuckle 用于多租户支持
  • Autofac与.NET Core DI结合

要求:

  • 目前,租户是通过租户管理应用程序添加的,无需重新启动应用程序。但是,将来,添加新租户时重新启动应用程序是一个选项,允许我在启动时注册 MassTransit 总线实例。
  • 我需要授予其他公司访问特定租户的权限。
  • 我想为每个租户注册消费者(使用租户配置),因为并非所有租户都会与相同的应用程序集成。这是可选要求。
  • 必须使用 DI 获取/实例化消费者

两种潜在的方法似乎可行:

1.具有特定于租户的队列名称的单个虚拟主机

如本文文章中所述,此方法反映了多租户数据库设置,其中每个表都有一个

tenantId
列,无需单独的数据库即可隔离租户数据。

优点

  • 配置简单
  • 无需预先创建RabbitMQ虚拟主机

缺点

  • 我不确定如何实施权限来防止一个租户访问另一租户的队列。

2.每个租户专用虚拟主机

优点

  • 彻底隔离租户
  • 更容易将租户限制在自己的虚拟主机上,防止跨租户访问

缺点

  • 每个新租户都需要提前创建一个新的虚拟主机。

其他注意事项:

  • 实施解决方案 2 可能需要为每个虚拟主机手动配置总线。但是,不再记录手动总线配置,这表明这可能不是推荐的做法。
  • MassTransit 的多总线功能在这种情况下是不可接受的,因为它需要为每个总线实例提供单独的接口,这意味着该应用程序需要为每个新租户重新部署。
  • 我不想使用 Autofac 多租户,因为 Finbuckle 已经在应用程序内管理多租户。

任何建议或代码示例将不胜感激!

asp.net-core rabbitmq multi-tenant masstransit
1个回答
0
投票

我通过为每个租户发出一个 IBus 扩展接口来解决这个问题。然后,我为每个发出的接口调用 AddMassTransit 方法。


internal class Program
{
    static void Main(string[] args)
    {
        var builder = Host.CreateDefaultBuilder(args);
        var config = new ConfigurationBuilder().AddJsonFile("appsettings.json", false).Build();

        MassTransitConfig massTransitConfig = config.GetSection("MassTransitConfig").Get<MassTransitConfig>();

        builder.ConfigureServices((hostContext, services) =>
        {
            BusControlFactory busControlFactory = new BusControlFactory(new MassTransitConfigDataProvider(massTransitConfig), services, [typeof(NotificationCreatedConsumer1).Assembly]);
            busControlFactory.Initialize();
            services.AddSingleton(busControlFactory);
        });

        var app = builder.Build();

        // USAGE:
        BusControlSupport busControlSupport = new BusControlSupport(app.Services.GetRequiredService<BusControlFactory>(), app.Services);
        IPublishEndpoint publishEndpoint = busControlSupport.GetPublishEndpoint("tenant1");
        Console.WriteLine(publishEndpoint);

        IClientFactory clientFactory = busControlSupport.GetClientFactory("tenant2");
        IRequestClient<ICreateOrder> requestClient = clientFactory.CreateRequestClient<ICreateOrder>();
        Console.WriteLine(requestClient);

        app.Run();
    }
}

/// <summary>
/// Gets MassTransit objects belonging to tenant
/// </summary>
public class BusControlSupport
{
    private readonly BusControlFactory busControlFactory;
    private readonly IServiceProvider serviceProvider;

    public BusControlSupport(BusControlFactory busControlFactory, IServiceProvider serviceProvider)
    {
        this.busControlFactory = busControlFactory;
        this.serviceProvider = serviceProvider;
    }

    public IPublishEndpoint GetPublishEndpoint(string tenantIdentifier)
    {
        Type openType = typeof(Bind<,>);
        Type[] typeParameters = [busControlFactory.BusTypes[tenantIdentifier], typeof(IPublishEndpoint)];
        Type closedType = openType.MakeGenericType(typeParameters);

        //Bind<IBus, IPublishEndpoint> publishEndpointBind = (Bind<IBus, IPublishEndpoint>)serviceProvider.GetService(closedType);
        object publishEndpointBind = serviceProvider.GetService(closedType);

        return (IPublishEndpoint)publishEndpointBind.GetType().GetProperty("Value");
    }


    public IClientFactory GetClientFactory(string tenantIdentifier)
    {
        Type openType = typeof(Bind<,>);
        Type[] typeParameters = [busControlFactory.BusTypes[tenantIdentifier], typeof(IClientFactory)];
        Type closedType = openType.MakeGenericType(typeParameters);

        //Bind<IBus, IPublishEndpoint> publishEndpointBind = (Bind<IBus, IPublishEndpoint>)serviceProvider.GetService(closedType);
        object clientFactoryBind = serviceProvider.GetService(closedType);

        return (IClientFactory)clientFactoryBind.GetType().GetProperty("Value");
    }
}

/// <summary>
/// Initializes one MassTransit IBus per tenant
/// </summary>
public class BusControlFactory
{
    private readonly IMassTransitConfigDataProvider massTransitConfigDataProvider;
    private readonly IServiceCollection services;
    private readonly Assembly[] consumerAssemblies;

    internal IDictionary<string, Type> BusTypes { get; } = new Dictionary<string, Type>();

    public BusControlFactory(IMassTransitConfigDataProvider massTransitConfigDataProvider, IServiceCollection services, Assembly[] consumerAssemblies)
    {
        this.massTransitConfigDataProvider = massTransitConfigDataProvider;
        this.services = services;
        this.consumerAssemblies = consumerAssemblies;
    }

    public void Initialize()
    {
        // get the AddMassTransit<TBus> method using reflection
        Type registrationExtensionType = typeof(DependencyInjectionRegistrationExtensions);
        IEnumerable<MethodInfo> methodInfos = registrationExtensionType.GetMethods().Where(mi => mi.Name == nameof(DependencyInjectionRegistrationExtensions.AddMassTransit));
        MethodInfo? addMassTransitMethodInfo = null;

        foreach (MethodInfo methodInfo in methodInfos.Where(mi => mi.IsGenericMethodDefinition && mi.GetGenericArguments().Length == 1))
        {
            Type genericArgumentType = methodInfo.GetGenericArguments().First();
            Type[] genericParameterConstraints = genericArgumentType.GetGenericParameterConstraints();
            if (genericParameterConstraints.Length == 1 && genericParameterConstraints.First().FullName == typeof(IBus).FullName)
            {
                addMassTransitMethodInfo = methodInfo;
                break;
            }
        }

        if (addMassTransitMethodInfo == null)
            throw new Exception("addMassTransitMethodInfo == null");

        // execute AddMassTransit<TBus> for every tenant
        foreach (MassTransitTenantConfigData tenantConfigData in massTransitConfigDataProvider.TenantConfigDatas)
        {
            Type busType = TenantBusInterfaceGenerator.CreateTenantBusInterface(tenantConfigData.TenantId);
            BusTypes[tenantConfigData.TenantId] = busType;
            MethodInfo addMassTransitMethod = addMassTransitMethodInfo.MakeGenericMethod(busType);

            if (addMassTransitMethod == null)
                throw new InvalidOperationException($"Could not create AddMassTransit generic method for {busType.FullName}.");

            // Define the configuration action
            Action<IBusRegistrationConfigurator> configureAction = 
                busConfigurator =>
                {
                    busConfigurator.SetKebabCaseEndpointNameFormatter();
                    busConfigurator.AddConsumers(consumerAssemblies);

                    busConfigurator.UsingRabbitMq((context, busFactoryConfigurator) =>
                    {
                        busFactoryConfigurator.Host(tenantConfigData.Uri, hostConfigurator =>
                        {
                            hostConfigurator.Username(tenantConfigData.Username);
                            hostConfigurator.Password(tenantConfigData.Password);
                        });

                        busFactoryConfigurator.ConfigureEndpoints(context);
                    });
                };

            // invoke AddMassTransit<busType>
            addMassTransitMethod.Invoke(null, [services, configureAction]);
        }
    }
}


/// <summary>
/// Emits interface that extends IBus interface
/// </summary>
public static class TenantBusInterfaceGenerator
{
    private static readonly AssemblyBuilder AssemblyBuilder = AssemblyBuilder.DefineDynamicAssembly(new AssemblyName("DynamicTenantBuses"), AssemblyBuilderAccess.Run);
    private static readonly ModuleBuilder ModuleBuilder = AssemblyBuilder.DefineDynamicModule("MainModule");

    public static Type CreateTenantBusInterface(string tenantName)
    {
        string interfaceName = $"I{ToValidClassName(tenantName)}Bus";
        TypeBuilder typeBuilder = ModuleBuilder.DefineType(interfaceName, TypeAttributes.Public | TypeAttributes.Interface | TypeAttributes.Abstract);
        typeBuilder.AddInterfaceImplementation(typeof(IBus));

        return typeBuilder.CreateTypeInfo().AsType();
    }

    private static string ToValidClassName(string input)
    {
        if (string.IsNullOrWhiteSpace(input))
            throw new ArgumentException("Input cannot be null or whitespace.", input);
        
        string cleaned = Regex.Replace(input, @"[^A-Za-z0-9\s]", "");

        TextInfo textInfo = CultureInfo.InvariantCulture.TextInfo;
        string pascalCaseName = textInfo.ToTitleCase(cleaned).Replace(" ", "");

        if (char.IsDigit(pascalCaseName[0]))
            pascalCaseName = "_" + pascalCaseName;

        return pascalCaseName;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.