我有一个状态机,在收到初始事件后,它将发出一个请求,应该等待响应并转换到最终状态。它按预期工作,除了它会收到超时事件,尽管响应是由状态机处理的,传奇已更新并且状态已更改为“最终”。我认为这不是预期的行为。
异常是可以理解的,但超时过期事件不应该发生。
MassTransit.NotAcceptedStateMachineException: MassTransitExample.Models.Book(a16bd901-3ed9-4633-8e0c-4b00d8babeff) Saga exception on receipt of MassTransit.Contracts.RequestTimeoutExpired<MassTransitExample.Messages.TestRequest>: Not accepted in state Final
状态机
public class BookStateMachine : MassTransitStateMachine<Book>
{
public BookStateMachine()
{
Event(() => Added, x => x.CorrelateById(m => m.Message.BookId));
Request(() => TestRequest, r =>
{
r.Completed = e => e.ConfigureConsumeTopology = false;
r.Faulted = e => e.ConfigureConsumeTopology = false;
//r.TimeoutExpired = e => e.ConfigureConsumeTopology = false;
});
InstanceState(x => x.CurrentState, Available);
Initially(
When(Added)
.CopyDataToInstance()
.Request(TestRequest, ctx => new TestRequest()
{
BookId = ctx.Message.BookId
})
.TransitionTo(TestRequest.Pending));
this.During(TestRequest.Pending,
When(TestRequest.Completed)
.Then(ctx =>
{
ctx.Saga.ResponseText = ctx.Message.ResponseText;
})
.TransitionTo(Final));
}
public Event<BookAdded> Added { get; private set; }
public Request<Book, TestRequest, TestResponse> TestRequest { get; } = null!;
public State Available { get; private set; }
}
public static class BookStateMachineExtensions
{
public static EventActivityBinder<Book, BookAdded> CopyDataToInstance(
this EventActivityBinder<Book, BookAdded> binder)
{
return binder
.Then(x =>
{
x.Saga.BookId = x.Message.BookId;
x.Saga.DateAdded = x.Message.Timestamp.Date;
x.Saga.Title = x.Message.Title;
x.Saga.Isbn = x.Message.Isbn;
});
}
}
DbContext
public class TestDbContext(DbContextOptions options) : SagaDbContext(options)
{
public DbSet<Book> Books { get; set; }
protected override IEnumerable<ISagaClassMap> Configurations
{
get
{
yield return new BookStateMap();
}
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
foreach (ISagaClassMap sagaMap in Configurations)
{
sagaMap.Configure(modelBuilder);
}
}
}
public class BookStateMap : SagaClassMap<Book>
{
protected override void Configure(EntityTypeBuilder<Book> entity, ModelBuilder model)
{
entity.HasKey(x => x.CorrelationId);
entity.Property(x => x.CurrentState).HasMaxLength(64);
entity.Property(x => x.BookId).IsRequired();
entity.Property(x => x.DateAdded).IsRequired();
entity.Property(x => x.Title).IsRequired();
entity.Property(x => x.Isbn).IsRequired();
entity.Property(x => x.ResponseText);
entity.Property(x => x.CurrentState).IsRequired();
entity.Property(x => x.RowVersion).IsRowVersion();
}
}
配置
builder.Services.AddDbContext<TestDbContext>(
options => options.UseSqlServer(connectionString));
builder.Services.AddMassTransit(x =>
{
x.AddDelayedMessageScheduler();
Assembly? entryAssembly = Assembly.GetEntryAssembly();
x.AddConsumers(entryAssembly);
x.AddSagaStateMachine<BookStateMachine, Book>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
r.ExistingDbContext<TestDbContext>();
r.UseSqlServer();
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.UseDelayedMessageScheduler();
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
消费者和合同
public class TestRequestConsumer : IConsumer<TestRequest>
{
public async Task Consume(ConsumeContext<TestRequest> context)
{
await Task.Delay(3000);
await context.RespondAsync(new TestResponse()
{
BookId = context.Message.BookId,
ResponseText = "Responded"
});
}
}
public record BookAdded
{
public Guid BookId { get; init; }
public string Title { get; init; }
public string Isbn { get; init; }
public DateTime Timestamp { get; init; }
}
public record TestRequest
{
public Guid BookId { get; init; }
}
public record TestResponse
{
public Guid BookId { get; init; }
public string ResponseText { get; init; }
}
我花了一些时间,但我找到了一个 github 讨论很好地解释了这个问题。
我希望这对其他人也有帮助 https://github.com/MassTransit/MassTransit/discussions/4314