.NET Mass Transit - 状态机 - 在最终状态期间不处理NotificationFinished 事件

问题描述 投票:0回答:1

我正在使用 Azure 服务总线和 MassTransit 库。使用状态机时我遇到了一个非常奇怪的错误。 NewDataRequested、OldDateRequested 消息同时触发,并且超时时间可能不同。然后调用NotificationRequested -> Finished。一旦所有这些消息用完,我就会切换到使用复合事件完成。这将导致我的消息被发布。

public class DataState : SagaStateMachineInstance
    {
        public Guid CorrelationId { get; set; }

        public int CurrentState { get; set; }

        public int ReadyEventStatus { get; set; }

        public Guid RequestId { get; set; }
        public bool IsError { get; set; }
    }

    public class DataStateMachine : MassTransitStateMachine<DataState>
    {
        public Event<NewDataRequested>? NewDataRequested { get; }
        public Event<Fault<NewDataRequested>>? NewDataRequestedFault { get; }

        public Event<NewDataFinished>? NewDataFinished { get; }
        public Event<Fault<NewDataFinished>>? NewDataFinishedFault { get; }

        public Event<OldDataRequested>? OldDataRequested { get; }
        public Event<Fault<OldDataRequested>>? OldDataRequestedFault { get; }

        public Event<OldDataFinished>? OldDataFinished { get; }
        public Event<Fault<OldDataFinished>>? OldDataFinishedFault { get; }

        public Event<NotificationRequested>? NotificationRequested { get; }
        public Event<Fault<NotificationRequested>>? NotificationRequestedFault { get; }

        public Event<NotificationFinished>? NotificationFinished { get; }
        public Event<Fault<NotificationFinished>>? NotificationFinishedFault { get; }

        public MassTransit.Event? Finished { get; }

        public RunDataStateMachine(ILogger<RunDataStateMachine> logger)
        {
            InstanceState(x => x.CurrentState);

            Event(() => NewDataRequested, x => x.CorrelateById(context => context.Message.RequestId));
            Event(() => NewDataRequestedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));

            Event(() => NewDataFinished, x => x.CorrelateById(context => context.Message.RequestId));
            Event(() => NewDataFinishedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));

            Event(() => OldDataRequested, x => x.CorrelateById(context => context.Message.RequestId));
            Event(() => OldDataRequestedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));

            Event(() => OldDataFinished, x => x.CorrelateById(context => context.Message.RequestId));
            Event(() => OldDataFinishedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));

            Event(() => NotificationRequested, x => x.CorrelateById(context => context.Message.RequestId));
            Event(() => NotificationRequestedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));

            Event(() => NotificationFinished, x => x.CorrelateById(context => context.Message.RequestId));
            Event(() => NotificationFinishedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));

            Initially(
                 When(NewDataRequested)
                    .Then(x =>
                    {
                        x.Saga.RequestId = x.Message.RequestId;
                    }),
                 When(NewDataRequestedFault)
                   .Then(x =>
                   {
                       if (!x.Saga.IsError)
                       {
                           x.Publish(Fault(x.Saga));
                           x.Saga.IsError = true;
                       }
                   }),

                 When(NewDataFinished),
                 When(NewDataFinishedFault)
                   .Then(x =>
                   {
                       if (!x.Saga.IsError)
                       {
                           x.Publish(Fault(x.Saga));
                           x.Saga.IsError = true;
                       }
                   }),

                 When(OldDataRequested),
                 When(OldDataRequestedFault)
                   .Then(x =>
                   {
                       if (!x.Saga.IsError)
                       {
                           x.Publish(Fault(x.Saga));
                           x.Saga.IsError = true;
                       }
                   }),

                 When(OldDataFinished),
                 When(OldDataFinishedFault)
                   .Then(x =>
                   {
                       if (!x.Saga.IsError)
                       {
                           x.Publish(Fault(x.Saga));
                           x.Saga.IsError = true;
                       }
                   }),

                When(NotificationRequested),
                When(NotificationRequestedFault)
                   .Then(x =>
                   {
                       if (!x.Saga.IsError)
                       {
                           x.Publish(Fault(x.Saga));
                           x.Saga.IsError = true;
                       }
                   }),

                When(NotificationFinished),
                When(NotificationFinishedFault)
                   .Then(x =>
                   {
                       if (!x.Saga.IsError)
                       {
                           x.Publish(Fault(x.Saga));
                           x.Saga.IsError = true;
                       }
                   }),
            );

            CompositeEvent(() => Finished,
                x => x.ReadyEventStatus,
                CompositeEventOptions.IncludeInitial,
                NewDataRequested,
                NewDataFinished,
                OldDataRequested,
                OldDataFinished,
                NotificationRequested,
                NotificationFinished
            );

            During(Initial,
                When(Finished)
                .Then(x =>
                {
                    x.Publish(new DataFinished() { RequestId = x.Saga.RequestId });
                })
                .Finalize()
            );

            SetCompletedWhenFinalized();
        }

        private FaultMessage Fault(DataState runDataState)
        {
            return new FaultMessage() { RequestId = runDataState.RequestId };
        }
    }

错误发生的方式确实不同,而且并不经常发生。它通过以下错误和堆栈跟踪保留在错误队列状态机中。

MT-Fault-ExceptionType - MassTransit.UnhandledEventException
MT-Fault-Message - The NotificationFinished event is not handled during the Final state for the DataStateMachine state machine
MT-Fault-StackTrace - at MassTransit.MassTransitStateMachine`1.DefaultUnhandledEventCallback(UnhandledEventContext`1 context) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 222 at MassTransit.MassTransitStateMachine`1.UnhandledEvent(BehaviorContext`1 context, State state) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 1266 at MassTransit.MassTransitStateMachine`1.<.ctor>b__19_2(BehaviorContext`1 context, State state) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 55 at MassTransit.MassTransitStateMachine`1.StateMachineState.MassTransit.State<TInstance>.Raise[T](BehaviorContext`2 context) in /_/src/MassTransit/SagaStateMachine/SagaStateMachine/StateMachineState.cs:line 165 at MassTransit.MassTransitStateMachine`1.MassTransit.StateMachine<TInstance>.RaiseEvent[T](BehaviorContext`2 context) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 135 at MassTransit.Middleware.StateMachineSagaMessageFilter`2.Send(SagaConsumeContext`2 context, IPipe`1 next) in /_/src/MassTransit/Middleware/StateMachineSagaMessageFilter.cs:line 66

我不完全明白可能出了什么问题。我尝试寻找 monge 解决方案和类似的场景。我发现问题可能出在顺序上,但我不这么认为,因为它不应该在这里产生影响。

非常感谢

c# .net azureservicebus masstransit state-machine
1个回答
0
投票

您遇到的错误“在 DataStateMachine 状态机的最终状态期间未处理 NotificationFinished 事件”通常在事件在未预期或未定义的状态下触发时发生。具体来说,当状态机已处于

NotificationFinished
状态时,将引发
Final
事件,此时不应处理进一步的转换或事件。

要在

Final
状态下优雅地处理此错误,您可以显式定义此类事件的行为,以避免出现未处理的异常。以下是如何在状态机代码中实现此功能的示例:

During(Final,
    When(NotificationFinished)
        .Then(x => 
        {
            
            Console.WriteLine("NotificationFinished event received in Final state.");
        })
        .Finalize() 
);

Finished
CompositeEvent
触发:

CompositeEvent(() => Finished,
    x => x.ReadyEventStatus,
    CompositeEventOptions.IncludeInitial,
    NewDataRequested,
    NewDataFinished,
    OldDataRequested,
    OldDataFinished,
    NotificationRequested,
    NotificationFinished
);

为了确保在

Final
状态下不会触发任何事件,您可以配置事件处理,如下所示:

Event(() => NotificationFinished, x => 
    x.CorrelateById(context => context.Message.RequestId)
     .OnMissingInstance(m => m.Discard())
);


有关在 MassTransit 中实现状态机的更多参考,请参阅 MassTransit 状态机文档

我已经提到了这个MassTransit,git@Chris Patterson和Stackreference

public class DataStateMachine : MassTransitStateMachine<DataState> 
{ 
public State Initial { get; private set; } 
 public State Final { get; private set; }
 
  // Handle Finished event and transition to Final state
        During(Initial,
            When(Finished)
                .Then(context => context.Publish(new DataFinished { RequestId = context.Saga.RequestId }))
                .TransitionTo(Final)
                .Finalize()
        );

        SetCompletedWhenFinalized();
    }

    private void HandleFault<T>(BehaviorContext<DataState, Fault<T>> context) where T : class
    {
        if (!context.Saga.IsError)
        {
            context.Publish(new FaultMessage { RequestId = context.Saga.RequestId });
            context.Saga.IsError = true;
        }
    }
}


enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.