我正在使用 Azure 服务总线和 MassTransit 库。使用状态机时我遇到了一个非常奇怪的错误。 NewDataRequested、OldDateRequested 消息同时触发,并且超时时间可能不同。然后调用NotificationRequested -> Finished。一旦所有这些消息用完,我就会切换到使用复合事件完成。这将导致我的消息被发布。
public class DataState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public int CurrentState { get; set; }
public int ReadyEventStatus { get; set; }
public Guid RequestId { get; set; }
public bool IsError { get; set; }
}
public class DataStateMachine : MassTransitStateMachine<DataState>
{
public Event<NewDataRequested>? NewDataRequested { get; }
public Event<Fault<NewDataRequested>>? NewDataRequestedFault { get; }
public Event<NewDataFinished>? NewDataFinished { get; }
public Event<Fault<NewDataFinished>>? NewDataFinishedFault { get; }
public Event<OldDataRequested>? OldDataRequested { get; }
public Event<Fault<OldDataRequested>>? OldDataRequestedFault { get; }
public Event<OldDataFinished>? OldDataFinished { get; }
public Event<Fault<OldDataFinished>>? OldDataFinishedFault { get; }
public Event<NotificationRequested>? NotificationRequested { get; }
public Event<Fault<NotificationRequested>>? NotificationRequestedFault { get; }
public Event<NotificationFinished>? NotificationFinished { get; }
public Event<Fault<NotificationFinished>>? NotificationFinishedFault { get; }
public MassTransit.Event? Finished { get; }
public RunDataStateMachine(ILogger<RunDataStateMachine> logger)
{
InstanceState(x => x.CurrentState);
Event(() => NewDataRequested, x => x.CorrelateById(context => context.Message.RequestId));
Event(() => NewDataRequestedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));
Event(() => NewDataFinished, x => x.CorrelateById(context => context.Message.RequestId));
Event(() => NewDataFinishedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));
Event(() => OldDataRequested, x => x.CorrelateById(context => context.Message.RequestId));
Event(() => OldDataRequestedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));
Event(() => OldDataFinished, x => x.CorrelateById(context => context.Message.RequestId));
Event(() => OldDataFinishedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));
Event(() => NotificationRequested, x => x.CorrelateById(context => context.Message.RequestId));
Event(() => NotificationRequestedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));
Event(() => NotificationFinished, x => x.CorrelateById(context => context.Message.RequestId));
Event(() => NotificationFinishedFault, x => x.CorrelateById(context => context.Message.Message.RequestId));
Initially(
When(NewDataRequested)
.Then(x =>
{
x.Saga.RequestId = x.Message.RequestId;
}),
When(NewDataRequestedFault)
.Then(x =>
{
if (!x.Saga.IsError)
{
x.Publish(Fault(x.Saga));
x.Saga.IsError = true;
}
}),
When(NewDataFinished),
When(NewDataFinishedFault)
.Then(x =>
{
if (!x.Saga.IsError)
{
x.Publish(Fault(x.Saga));
x.Saga.IsError = true;
}
}),
When(OldDataRequested),
When(OldDataRequestedFault)
.Then(x =>
{
if (!x.Saga.IsError)
{
x.Publish(Fault(x.Saga));
x.Saga.IsError = true;
}
}),
When(OldDataFinished),
When(OldDataFinishedFault)
.Then(x =>
{
if (!x.Saga.IsError)
{
x.Publish(Fault(x.Saga));
x.Saga.IsError = true;
}
}),
When(NotificationRequested),
When(NotificationRequestedFault)
.Then(x =>
{
if (!x.Saga.IsError)
{
x.Publish(Fault(x.Saga));
x.Saga.IsError = true;
}
}),
When(NotificationFinished),
When(NotificationFinishedFault)
.Then(x =>
{
if (!x.Saga.IsError)
{
x.Publish(Fault(x.Saga));
x.Saga.IsError = true;
}
}),
);
CompositeEvent(() => Finished,
x => x.ReadyEventStatus,
CompositeEventOptions.IncludeInitial,
NewDataRequested,
NewDataFinished,
OldDataRequested,
OldDataFinished,
NotificationRequested,
NotificationFinished
);
During(Initial,
When(Finished)
.Then(x =>
{
x.Publish(new DataFinished() { RequestId = x.Saga.RequestId });
})
.Finalize()
);
SetCompletedWhenFinalized();
}
private FaultMessage Fault(DataState runDataState)
{
return new FaultMessage() { RequestId = runDataState.RequestId };
}
}
错误发生的方式确实不同,而且并不经常发生。它通过以下错误和堆栈跟踪保留在错误队列状态机中。
MT-Fault-ExceptionType - MassTransit.UnhandledEventException
MT-Fault-Message - The NotificationFinished event is not handled during the Final state for the DataStateMachine state machine
MT-Fault-StackTrace - at MassTransit.MassTransitStateMachine`1.DefaultUnhandledEventCallback(UnhandledEventContext`1 context) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 222 at MassTransit.MassTransitStateMachine`1.UnhandledEvent(BehaviorContext`1 context, State state) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 1266 at MassTransit.MassTransitStateMachine`1.<.ctor>b__19_2(BehaviorContext`1 context, State state) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 55 at MassTransit.MassTransitStateMachine`1.StateMachineState.MassTransit.State<TInstance>.Raise[T](BehaviorContext`2 context) in /_/src/MassTransit/SagaStateMachine/SagaStateMachine/StateMachineState.cs:line 165 at MassTransit.MassTransitStateMachine`1.MassTransit.StateMachine<TInstance>.RaiseEvent[T](BehaviorContext`2 context) in /_/src/MassTransit/SagaStateMachine/MassTransitStateMachine.cs:line 135 at MassTransit.Middleware.StateMachineSagaMessageFilter`2.Send(SagaConsumeContext`2 context, IPipe`1 next) in /_/src/MassTransit/Middleware/StateMachineSagaMessageFilter.cs:line 66
我不完全明白可能出了什么问题。我尝试寻找 monge 解决方案和类似的场景。我发现问题可能出在顺序上,但我不这么认为,因为它不应该在这里产生影响。
非常感谢
您遇到的错误“在 DataStateMachine 状态机的最终状态期间未处理 NotificationFinished 事件”通常在事件在未预期或未定义的状态下触发时发生。具体来说,当状态机已处于
NotificationFinished
状态时,将引发 Final
事件,此时不应处理进一步的转换或事件。
要在
Final
状态下优雅地处理此错误,您可以显式定义此类事件的行为,以避免出现未处理的异常。以下是如何在状态机代码中实现此功能的示例:
During(Final,
When(NotificationFinished)
.Then(x =>
{
Console.WriteLine("NotificationFinished event received in Final state.");
})
.Finalize()
);
Finished
由 CompositeEvent
触发:
CompositeEvent(() => Finished,
x => x.ReadyEventStatus,
CompositeEventOptions.IncludeInitial,
NewDataRequested,
NewDataFinished,
OldDataRequested,
OldDataFinished,
NotificationRequested,
NotificationFinished
);
为了确保在
Final
状态下不会触发任何事件,您可以配置事件处理,如下所示:
Event(() => NotificationFinished, x =>
x.CorrelateById(context => context.Message.RequestId)
.OnMissingInstance(m => m.Discard())
);
有关在 MassTransit 中实现状态机的更多参考,请参阅 MassTransit 状态机文档。
我已经提到了这个MassTransit,git@Chris Patterson和Stackreference。
public class DataStateMachine : MassTransitStateMachine<DataState>
{
public State Initial { get; private set; }
public State Final { get; private set; }
// Handle Finished event and transition to Final state
During(Initial,
When(Finished)
.Then(context => context.Publish(new DataFinished { RequestId = context.Saga.RequestId }))
.TransitionTo(Final)
.Finalize()
);
SetCompletedWhenFinalized();
}
private void HandleFault<T>(BehaviorContext<DataState, Fault<T>> context) where T : class
{
if (!context.Saga.IsError)
{
context.Publish(new FaultMessage { RequestId = context.Saga.RequestId });
context.Saga.IsError = true;
}
}
}