using MassTransit;
using MainApp.Data;
using Shared.MessagingContracts;
namespace MainApp
{
public class MonitoringJobStateMachine : MassTransitStateMachine<MonitoringJobState>
{
public State Submitted { get; private set; } = null!;
public State Processing { get; private set; } = null!;
public State Completed { get; private set; } = null!;
public State Failed { get; private set; } = null!;
public Event<JobSubmitted> JobSubmittedEvent { get; private set; } = default!;
public Event<JobCompleted> JobCompletedEvent { get; private set; } = default!;
public Event<JobFailed> JobFailedEvent { get; private set; } = default!;
public Schedule<MonitoringJobState, JobTimeout> JobTimeoutSchedule { get; private set; } = default!;
public MonitoringJobStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => JobSubmittedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
x.InsertOnInitial = true;
});
Event(() => JobCompletedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
});
Event(() => JobFailedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
});
// Konfiguriere den Timeout-Scheduler
Schedule(() => JobTimeoutSchedule, x => x.TimeoutTokenId, s =>
{
s.Delay = TimeSpan.FromSeconds(10);
s.Received = r => r.CorrelateById(ctx => ctx.Message.CorrelationId);
});
// Initialer Übergang: JobSubmitted -> sende ProcessJob und plane Timeout
Initially(
When(JobSubmittedEvent)
.ThenAsync(async context =>
{
// Werte aus der eingehenden Nachricht übernehmen
context.Saga.SubmittedAt = context.Message.Timestamp;
context.Saga.Regions = context.Message.Regions;
context.Saga.CurrentAttempt = 0;
if (context.Saga.Regions != null && context.Saga.Regions.Count > 0)
context.Saga.CurrentRegion = context.Saga.Regions[0];
Console.WriteLine($"[Saga] Job {context.Saga.CorrelationId} submitted. Starting in Region {context.Saga.CurrentRegion}.");
// Sende den ProcessJob-Befehl an die regionalspezifische Queue (z. B. "jobs-de")
await context.Publish<ProcessJob>(new ProcessJobCommand
{
CorrelationId = context.Saga.CorrelationId,
Region = context.Saga.CurrentRegion,
Attempt = context.Saga.CurrentAttempt
}, publishContext =>
{
// Setzt explizit die Zieladresse für die Nachricht:
publishContext.DestinationAddress = new Uri($"queue:jobs-{context.Saga.CurrentRegion.ToLower()}");
});
})
// Plane den Timeout direkt in der Kette
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
.TransitionTo(Processing)
);
During(Processing,
When(JobCompletedEvent)
.Then(ctx =>
{
Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} completed successfully in region {ctx.Message.Region}.");
})
.Unschedule(JobTimeoutSchedule)
.TransitionTo(Completed),
When(JobFailedEvent)
.ThenAsync(async ctx =>
{
Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} failed in region {ctx.Message.Region}. Error: {ctx.Message.ErrorMessage}");
// Erhöhe den Versuchszähler
ctx.Saga.CurrentAttempt++;
if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
{
// Setze die neue Region
ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
Console.WriteLine($"[Saga] Fallback: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");
// Sende den neuen ProcessJob-Befehl an die neue regionale Queue
await ctx.Publish<ProcessJob>(new ProcessJobCommand
{
CorrelationId = ctx.Saga.CorrelationId,
Region = ctx.Saga.CurrentRegion,
Attempt = ctx.Saga.CurrentAttempt
}, publishContext =>
{
publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
});
}
})
// Plane nach einem Fehlschlag (bzw. nach Timeout) einen neuen Timeout
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
.IfElse(ctx => ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count,
binder => binder.TransitionTo(Processing),
binder => binder.TransitionTo(Failed)
),
When(JobTimeoutSchedule.Received)
.ThenAsync(async ctx =>
{
Console.WriteLine($"[Saga] Timeout in region {ctx.Saga.CurrentRegion} for job {ctx.Saga.CorrelationId}.");
// Bei Timeout wird der Versuch ebenfalls erhöht und ein Fallback eingeleitet
ctx.Saga.CurrentAttempt++;
if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
{
ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
Console.WriteLine($"[Saga] Fallback after timeout: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");
await ctx.Publish<ProcessJob>(new ProcessJobCommand
{
CorrelationId = ctx.Saga.CorrelationId,
Region = ctx.Saga.CurrentRegion,
Attempt = ctx.Saga.CurrentAttempt
}, publishContext =>
{
publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
});
}
})
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
);
}
}
}
Solution:i在注册中有错误,这就是为什么发生错误的原因。
代码:
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<MonitoringJobStateMachine, MonitoringJobState>()
.RedisRepository(r =>
{
r.DatabaseConfiguration("localhost:6379"); // Redis-Server-Adresse
r.KeyPrefix = "saga:"; // Optional: Präfix für Redis-Schlüssel
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), h =>
{
h.Username(rabbitUser);
h.Password(rabbitPass);
});
cfg.UseDelayedMessageScheduler();
cfg.UseMessageScheduler(new Uri("queue:scheduler"));
cfg.ConfigureEndpoints(context);
});
});