状态机成功处理响应后收到 MassTransit 请求超时事件

问题描述 投票:0回答:1

我有一个状态机,在收到初始事件后,它将发出一个请求,应该等待响应并转换到最终状态。它按预期工作,除了它会收到超时事件,尽管响应是由状态机处理的,传奇已更新并且状态已更改为“最终”。我认为这不是预期的行为。

异常是可以理解的,但超时过期事件不应该发生。

MassTransit.NotAcceptedStateMachineException: MassTransitExample.Models.Book(a16bd901-3ed9-4633-8e0c-4b00d8babeff) Saga exception on receipt of MassTransit.Contracts.RequestTimeoutExpired<MassTransitExample.Messages.TestRequest>: Not accepted in state Final

状态机

public class BookStateMachine : MassTransitStateMachine<Book>
{
    public BookStateMachine()
    {

        Event(() => Added, x => x.CorrelateById(m => m.Message.BookId));

        Request(() => TestRequest, r =>
        {
            r.Completed = e => e.ConfigureConsumeTopology = false;
            r.Faulted = e => e.ConfigureConsumeTopology = false;
            //r.TimeoutExpired = e => e.ConfigureConsumeTopology = false;
        });

        InstanceState(x => x.CurrentState, Available);

        Initially(
            When(Added)
                .CopyDataToInstance()
                .Request(TestRequest, ctx => new TestRequest()
                {
                    BookId = ctx.Message.BookId
                })
                .TransitionTo(TestRequest.Pending));

        this.During(TestRequest.Pending,
            When(TestRequest.Completed)
                .Then(ctx =>
                {
                    ctx.Saga.ResponseText = ctx.Message.ResponseText;
                })
                .TransitionTo(Final));
    }

    public Event<BookAdded> Added { get; private set; }

    public Request<Book, TestRequest, TestResponse> TestRequest { get; } = null!;

    public State Available { get; private set; }
}

public static class BookStateMachineExtensions
{
    public static EventActivityBinder<Book, BookAdded> CopyDataToInstance(
        this EventActivityBinder<Book, BookAdded> binder)
    {
        return binder
            .Then(x =>
            {
                x.Saga.BookId = x.Message.BookId;
                x.Saga.DateAdded = x.Message.Timestamp.Date;
                x.Saga.Title = x.Message.Title;
                x.Saga.Isbn = x.Message.Isbn;
            });
    }
}

DbContext

public class TestDbContext(DbContextOptions options) : SagaDbContext(options)
{
    public DbSet<Book> Books { get; set; }

    protected override IEnumerable<ISagaClassMap> Configurations
    {
        get
        {
            yield return new BookStateMap();
        }
    }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        foreach (ISagaClassMap sagaMap in Configurations)
        {
            sagaMap.Configure(modelBuilder);
        }
    }
}

public class BookStateMap : SagaClassMap<Book>
{
    protected override void Configure(EntityTypeBuilder<Book> entity, ModelBuilder model)
    {
        entity.HasKey(x => x.CorrelationId);
        entity.Property(x => x.CurrentState).HasMaxLength(64);

        entity.Property(x => x.BookId).IsRequired();
        entity.Property(x => x.DateAdded).IsRequired();
        entity.Property(x => x.Title).IsRequired();
        entity.Property(x => x.Isbn).IsRequired();
        entity.Property(x => x.ResponseText);

        entity.Property(x => x.CurrentState).IsRequired();
        entity.Property(x => x.RowVersion).IsRowVersion();
    }
}

配置

builder.Services.AddDbContext<TestDbContext>(
    options => options.UseSqlServer(connectionString));

builder.Services.AddMassTransit(x =>
{
    x.AddDelayedMessageScheduler();

    Assembly? entryAssembly = Assembly.GetEntryAssembly();

    x.AddConsumers(entryAssembly);

    x.AddSagaStateMachine<BookStateMachine, Book>()
        .EntityFrameworkRepository(r =>
        {
            r.ConcurrencyMode = ConcurrencyMode.Optimistic; 

            r.ExistingDbContext<TestDbContext>();
            r.UseSqlServer();
        });

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.UseDelayedMessageScheduler();

        cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.ConfigureEndpoints(context);
    });
});

消费者和合同

public class TestRequestConsumer : IConsumer<TestRequest>
{
    public async Task Consume(ConsumeContext<TestRequest> context)
    {
        await Task.Delay(3000);

        await context.RespondAsync(new TestResponse()
        {
            BookId = context.Message.BookId,
            ResponseText = "Responded"
        });
    }
}

public record BookAdded
{
    public Guid BookId { get; init; }
    public string Title { get; init; }
    public string Isbn { get; init; }
    public DateTime Timestamp { get; init; }
}

public record TestRequest
{
    public Guid BookId { get; init; }
}

public record TestResponse
{
    public Guid BookId { get; init; }
    public string ResponseText { get; init; }
}
masstransit
1个回答
0
投票

我花了一些时间,但我找到了一个 github 讨论很好地解释了这个问题。

我希望这对其他人也有帮助 https://github.com/MassTransit/MassTransit/discussions/4314

© www.soinside.com 2019 - 2024. All rights reserved.