在 ASP.NET Core 中安排 MassTransit 发布

问题描述 投票:0回答:1

我有一个使用 MassTransit 发布消息的 ASP.NET Core Web API,并且我正在添加 Hangfire 进行调度。我还希望能够查看 Hangfire 仪表板。使用如下所示的相关配置和发布代码,不会安排消息。我可以看到数据库中的 Hangfire 仪表板和表格。有谁知道这样做的完整示例或有任何建议吗?

配置:

builder.Services.AddHangfireServer()
                .AddHangfire(configuration => configuration.UseSqlServerStorage(...);

builder.Services.AddMassTransit(x =>
{
    x.UsingRabbitMq((_, cfg) =>
    {
        cfg.Host(...);

        cfg.UsePublishMessageScheduler();
    });

    x.AddPublishMessageScheduler();
    x.AddHangfireConsumers();
});

app.UseHangfireDashboard()
   .UseHealthChecks("/health");

出版:

public class PublishingClass
{
    IMessageScheduler MessageScheduler { get; init; }

    public PublishingClass(IMessageScheduler messageScheduler)
    {
        MessageScheduler = messageScheduler;
    }

    public async Task Publish(Command command, DateTime sendDate)
    {
        await MessageScheduler.SchedulePublish(sendDate, command);
    }
}
asp.net-core masstransit hangfire
1个回答
0
投票

您可以尝试这个工作示例。
套餐

Hangfire
MassTransit
MassTransit.Hangfire
MassTransit.RabbitMQ
Microsoft.Data.SqlClient

程序.cs

builder.Services.AddHangfire(x => x.UseSqlServerStorage("Data Source=192.168.2.68;Initial Catalog=hangfireDB;User ID=sa;Password=xxxxx;TrustServerCertificate=True"));

builder.Services.AddMassTransit(x =>
{
    x.AddConsumers(typeof(Program).Assembly);

    x.AddPublishMessageScheduler();

    x.UsingRabbitMq((context, config) =>
    {
        config.UsePublishMessageScheduler();
        config.UseHangfireScheduler();

        config.Host("rabbitmq://xx.xxx.xxx.xxxx:5672", hostconfig =>
        {
            hostconfig.Username("guest");
            hostconfig.Password("guest");
        });
        config.ConfigureEndpoints(context);
    });
});

//builder.Services.AddOptions<MassTransitHostOptions>().Configure(options =>
//    {
//        options.WaitUntilStarted = true;
//    });
...
app.UseHangfireDashboard();

我的模型.cs

    public record MyModel
    {
        public string value { get; init; }
    }

Consumer1.cs

    public class Consumer1 : IConsumer<MyModel>  
    {
        public Task Consume(ConsumeContext<MyModel> context)
        {
            Console.WriteLine("Now is "+DateTime.Now.ToString());
            Console.WriteLine($"MassTransit.Consumer1 :{JsonSerializer.Serialize(context.Message)}");
            return Task.CompletedTask;
        }
    }

发布控制器

    [ApiController]
    public class ValuesController : ControllerBase
    {

        private readonly IMessageScheduler _scheduler;

        public ValuesController(IBus publishEndpoint,IMessageScheduler scheduler)
        {
            this._scheduler = scheduler;
        }
        [HttpGet("pub")]
        public async Task pub()
        {
            await _scheduler.SchedulePublish(DateTime.UtcNow.AddSeconds(10), new MyModel() { value = "message at "+DateTime.Now.ToString() });
        }
    }

测试

© www.soinside.com 2019 - 2024. All rights reserved.