我对 MassTransit 还很陌生,不明白我做错了什么会出现以下异常:
Messages types must not be System types
。
这是我的定义:
[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public int Version { get; set; }
public Guid ActivationId { get; set; }
}
public static class MessageContracts
{
static bool _initialized;
public static void Initialize()
{
if (_initialized)
return;
GlobalTopology.Send.UseCorrelationId<StartProcessingMessage>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<ReconstructionFinishedMessage>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<ProcessingFinishedMessage>(x => x.ActivationId);
_initialized = true;
}
}
我的 2 个消费者是:
public class StartReconstructionConsumer : IConsumer<StartProcessingMessage>
{
readonly ILogger<StartReconstructionConsumer> _Logger;
private readonly int _DelaySeconds = 5;
public StartReconstructionConsumer(ILogger<StartReconstructionConsumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<StartProcessingMessage> context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Received Scan: {activationId}");
await Task.Delay(_DelaySeconds * 1000);
_Logger.LogInformation($"Finish Scan: {activationId}");
await context.Publish<ReconstructionFinishedMessage>(new { ActivationId = activationId });
}
}
public class ProcessingFinishedConsumer : IConsumer<ProcessingFinishedMessage>
{
readonly ILogger<ProcessingFinishedConsumer> _Logger;
public ProcessingFinishedConsumer(ILogger<ProcessingFinishedConsumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<ProcessingFinishedMessage> context)
{
_Logger.LogInformation($"Finish {context.Message.ActivationId}");
await Task.CompletedTask;
}
}
这是状态机定义:
public class ArcStateMachine: MassTransitStateMachine<ArcProcess>
{
static ArcStateMachine()
{
MessageContracts.Initialize();
}
public ArcStateMachine()
{
InstanceState(x => x.CurrentState);
Initially(
When(ProcessingStartedEvent)
.Then(context =>
{
Console.WriteLine(">> ProcessingStartedEvent");
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(ProcessingStartedState));
During(ProcessingStartedState,
When(ReconstructionFinishedEvent)
.Then(context =>
{
Console.WriteLine(">> ReconstructionFinishedEvent");
context.Instance.ActivationId = context.Data.ActivationId;
})
.Publish(context =>
{
return context.Init<ProcessingFinishedMessage>(new { ActivationId = context.Data.ActivationId });
})
.TransitionTo(ProcessingFinishedState)
.Finalize());
}
public State ProcessingStartedState { get; }
public State ReconstructionStartedState { get; }
public State ReconstructionFinishedState { get; }
public State ProcessingFinishedState { get; }
public Event<StartProcessingMessage> ProcessingStartedEvent { get; }
public Event<ReconstructionStartedMessage> ReconstructionStartedEvent { get; }
public Event<ReconstructionFinishedMessage> ReconstructionFinishedEvent { get; }
public Event<ProcessingFinishedMessage> ProcessingFinishedEvent { get; }
}
MassTransit 的设置如下所示:
var rabbitHost = Configuration["RABBIT_MQ_HOST"];
if (rabbitHost.IsNotEmpty())
{
services.AddMassTransit(cnf =>
{
var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];
var machine = new ArcStateMachine();
var repository = MongoDbSagaRepository<ArcProcess>.Create(connectionString,
"mongoRepo", "WorkflowState");
cnf.AddConsumer(typeof(StartReconstructionConsumer));
cnf.AddConsumer(typeof(ProcessingFinishedConsumer));
cnf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), hst =>
{
hst.Username("guest");
hst.Password("guest");
});
cfg.ConfigureEndpoints(context);
cfg.ReceiveEndpoint(BusConstants.SagaQueue,
e => e.StateMachineSaga(machine, repository));
});
});
services.AddMassTransitHostedService();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
});
}
我有几个问题:
实际上何时通过发布消息来发布事件? IE。在我的示例中
await _BusInstance.Bus.Publish<StartProcessingMessage>(new { ActivationId = id });
是从 WebApi 调用的,该 WebApi 被 StartReconstructionConsumer
使用,但实际上状态机开始与 Initially(When(ProcessingStartedEvent)...
一起工作?
我的处理应确保我已经处于
ProcessingStartedState
状态,以便 During(ProcessingStartedState, When(ReconstructionFinishedEvent)...
正确行动。那么,我如何确保在收到 StartProcessingMessage
后触发的消费者可以发布应启动该 ReconstructionFinishedMessage
的 During
?我是否正确构建了消息交换?
目前,对于
await context.Publish<ReconstructionFinishedMessage>(new { ActivationId = activationId });
,我在日志中遇到异常,指出 R-FAULT rabbitmq://localhost/saga.service d4070000-7b3b-704d-0f10-08d99942c959 Nanox.GC.Shared.AppCore.Messages.ReconstructionFinishedMessage ReconCaller.Saga.ArcProcess(00:00:04.1132604)
,而消息中的 GUID 实际上是 MessageId
。我在rabbitmq中的消息被路由到saga.service_error
,但有一个例外是Messages types must not be System types: System.Threading.Tasks.Task<Nanox.GC.Shared.AppCore.Messages.ProcessingFinishedMessage> (Parameter 'T')
。
看来我真的很想念这里..
我的目的是启动处理,该处理将由几个消费者按顺序处理多个阶段。所以在这里我尝试构建一个简单的状态机,每当有人调用
StartProcessing
时就会启动,然后每个消费者都会完成其工作并触发 FinishedStepX
,这会将状态机提升到新的步骤并启动下一个消费者,直到所有处理完成,状态机将报告 ProcessingComplete
。
感谢您的帮助
首先,您的总线配置有点奇怪,所以我已经清理了它:
services.AddMassTransit(cnf =>
{
var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];
cfg.AddSagaStateMachine<ArcStateMachine, ArcProcess>()
.Endpoint(e => e.Name = BusConstants.SagaQueue)
.MongoDbRepository(connectionString, r =>
{
r.DatabaseName = "mongoRepo";
r.CollectionName = "WorkflowState";
});
cnf.AddConsumer<StartReconstructionConsumer>();
cnf.AddConsumer<ProcessingFinishedConsumer>();
cnf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), hst =>
{
hst.Username("guest");
hst.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
发布问题与所使用的方法有关,只有PublishAsync允许使用消息初始值设定项:
During(ProcessingStartedState,
When(ReconstructionFinishedEvent)
.Then(context =>
{
Console.WriteLine(">> ReconstructionFinishedEvent");
context.Instance.ActivationId = context.Data.ActivationId;
})
.PublishAsync(context =>
{
return context.Init<ProcessingFinishedMessage>(new { ActivationId = context.Data.ActivationId });
})
.TransitionTo(ProcessingFinishedState)
.Finalize());
这应该可以解决你的问题。
在 @Chris Patterson 的慷慨帮助下,可行的解决方案是:
定义:
[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public int Version { get; set; }
public Guid ActivationId { get; set; }
}
public interface StartProcessingMessage
{
Guid ActivationId { get; }
}
public interface ProcessingFinishedMessage
{
Guid ActivationId { get; }
}
public static class MessageContracts
{
static bool _initialized;
public static void Initialize()
{
if (_initialized)
return;
GlobalTopology.Send.UseCorrelationId<StartProcessingMessage>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<ProcessingFinishedMessage>(x => x.ActivationId);
_initialized = true;
}
}
消费者:
public class StartProcessingConsumer : IConsumer<StartProcessingMessage>
{
readonly ILogger<StartProcessingConsumer> _Logger;
private readonly int _DelaySeconds = 5;
public StartProcessingConsumer(ILogger<StartProcessingConsumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<StartProcessingMessage> context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Received Scan: {activationId}");
await Task.Delay(_DelaySeconds * 1000);
_Logger.LogInformation($"Finish Scan: {activationId}");
await context.Publish<ProcessingFinishedMessage>(new { ActivationId = activationId });
}
}
public class ProcessingFinishedConsumer : IConsumer<ProcessingFinishedMessage>
{
readonly ILogger<ProcessingFinishedConsumer> _Logger;
public ProcessingFinishedConsumer(ILogger<ProcessingFinishedConsumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<ProcessingFinishedMessage> context)
{
_Logger.LogInformation($"Finish {context.Message.ActivationId}");
await Task.CompletedTask;
}
}
状态机定义:
public class ArcStateMachine: MassTransitStateMachine<ArcProcess>
{
static ArcStateMachine()
{
MessageContracts.Initialize();
}
public ArcStateMachine()
{
InstanceState(x => x.CurrentState);
Initially(
When(ProcessingStartedEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(ProcessingStartedState));
During(ProcessingStartedState,
When(ProcessingFinishedEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.Finalize());
}
public State ProcessingStartedState { get; }
public State ProcessingFinishedState { get; }
public Event<StartProcessingMessage> ProcessingStartedEvent { get; }
public Event<ProcessingFinishedMessage> ProcessingFinishedEvent { get; }
}
大众交通设置:
var rabbitHost = Configuration["RABBIT_MQ_HOST"];
if (rabbitHost.IsNotEmpty())
{
services.AddMassTransit(cnf =>
{
var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];
cnf.AddSagaStateMachine<ArcStateMachine, ArcProcess>()
.Endpoint(e => e.Name = BusConstants.SagaQueue)
.MongoDbRepository(connectionString, r =>
{
r.DatabaseName = "mongoRepo";
r.CollectionName = "WorkflowState";
});
cnf.AddConsumer(typeof(StartProcessingConsumer));
cnf.AddConsumer(typeof(ProcessingFinishedConsumer));
cnf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), hst =>
{
hst.Username("guest");
hst.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
});
}
这个例子对我理解 MassTrasit 的基础知识如何工作有很大帮助。