我无法解释以下错误:

问题描述 投票:0回答:1
MonitoringJobstatemachine-class:

using MassTransit; using MainApp.Data; using Shared.MessagingContracts; namespace MainApp { public class MonitoringJobStateMachine : MassTransitStateMachine<MonitoringJobState> { public State Submitted { get; private set; } = null!; public State Processing { get; private set; } = null!; public State Completed { get; private set; } = null!; public State Failed { get; private set; } = null!; public Event<JobSubmitted> JobSubmittedEvent { get; private set; } = default!; public Event<JobCompleted> JobCompletedEvent { get; private set; } = default!; public Event<JobFailed> JobFailedEvent { get; private set; } = default!; public Schedule<MonitoringJobState, JobTimeout> JobTimeoutSchedule { get; private set; } = default!; public MonitoringJobStateMachine() { InstanceState(x => x.CurrentState); Event(() => JobSubmittedEvent, x => { x.CorrelateById(ctx => ctx.Message.CorrelationId); x.InsertOnInitial = true; }); Event(() => JobCompletedEvent, x => { x.CorrelateById(ctx => ctx.Message.CorrelationId); }); Event(() => JobFailedEvent, x => { x.CorrelateById(ctx => ctx.Message.CorrelationId); }); // Konfiguriere den Timeout-Scheduler Schedule(() => JobTimeoutSchedule, x => x.TimeoutTokenId, s => { s.Delay = TimeSpan.FromSeconds(10); s.Received = r => r.CorrelateById(ctx => ctx.Message.CorrelationId); }); // Initialer Übergang: JobSubmitted -> sende ProcessJob und plane Timeout Initially( When(JobSubmittedEvent) .ThenAsync(async context => { // Werte aus der eingehenden Nachricht übernehmen context.Saga.SubmittedAt = context.Message.Timestamp; context.Saga.Regions = context.Message.Regions; context.Saga.CurrentAttempt = 0; if (context.Saga.Regions != null && context.Saga.Regions.Count > 0) context.Saga.CurrentRegion = context.Saga.Regions[0]; Console.WriteLine($"[Saga] Job {context.Saga.CorrelationId} submitted. Starting in Region {context.Saga.CurrentRegion}."); // Sende den ProcessJob-Befehl an die regionalspezifische Queue (z. B. "jobs-de") await context.Publish<ProcessJob>(new ProcessJobCommand { CorrelationId = context.Saga.CorrelationId, Region = context.Saga.CurrentRegion, Attempt = context.Saga.CurrentAttempt }, publishContext => { // Setzt explizit die Zieladresse für die Nachricht: publishContext.DestinationAddress = new Uri($"queue:jobs-{context.Saga.CurrentRegion.ToLower()}"); }); }) // Plane den Timeout direkt in der Kette .Schedule(JobTimeoutSchedule, ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId }, ctx => TimeSpan.FromSeconds(10)) .TransitionTo(Processing) ); During(Processing, When(JobCompletedEvent) .Then(ctx => { Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} completed successfully in region {ctx.Message.Region}."); }) .Unschedule(JobTimeoutSchedule) .TransitionTo(Completed), When(JobFailedEvent) .ThenAsync(async ctx => { Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} failed in region {ctx.Message.Region}. Error: {ctx.Message.ErrorMessage}"); // Erhöhe den Versuchszähler ctx.Saga.CurrentAttempt++; if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count) { // Setze die neue Region ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt]; Console.WriteLine($"[Saga] Fallback: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}."); // Sende den neuen ProcessJob-Befehl an die neue regionale Queue await ctx.Publish<ProcessJob>(new ProcessJobCommand { CorrelationId = ctx.Saga.CorrelationId, Region = ctx.Saga.CurrentRegion, Attempt = ctx.Saga.CurrentAttempt }, publishContext => { publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}"); }); } }) // Plane nach einem Fehlschlag (bzw. nach Timeout) einen neuen Timeout .Schedule(JobTimeoutSchedule, ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId }, ctx => TimeSpan.FromSeconds(10)) .IfElse(ctx => ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count, binder => binder.TransitionTo(Processing), binder => binder.TransitionTo(Failed) ), When(JobTimeoutSchedule.Received) .ThenAsync(async ctx => { Console.WriteLine($"[Saga] Timeout in region {ctx.Saga.CurrentRegion} for job {ctx.Saga.CorrelationId}."); // Bei Timeout wird der Versuch ebenfalls erhöht und ein Fallback eingeleitet ctx.Saga.CurrentAttempt++; if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count) { ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt]; Console.WriteLine($"[Saga] Fallback after timeout: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}."); await ctx.Publish<ProcessJob>(new ProcessJobCommand { CorrelationId = ctx.Saga.CorrelationId, Region = ctx.Saga.CurrentRegion, Attempt = ctx.Saga.CurrentAttempt }, publishContext => { publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}"); }); } }) .Schedule(JobTimeoutSchedule, ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId }, ctx => TimeSpan.FromSeconds(10)) ); } } }

Solution:
i在注册中有错误,这就是为什么发生错误的原因。

代码:

        services.AddMassTransit(x =>
        {
            x.AddSagaStateMachine<MonitoringJobStateMachine, MonitoringJobState>()
                    .RedisRepository(r =>
                    {
                        r.DatabaseConfiguration("localhost:6379"); // Redis-Server-Adresse
                        r.KeyPrefix = "saga:"; // Optional: Präfix für Redis-Schlüssel
                    });

            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.Host(new Uri(rabbitHost), h =>
                {
                    h.Username(rabbitUser);
                    h.Password(rabbitPass);
                });

                cfg.UseDelayedMessageScheduler();
                cfg.UseMessageScheduler(new Uri("queue:scheduler"));

                cfg.ConfigureEndpoints(context);
            });
        });
c# message-queue masstransit
1个回答
0
投票

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.