MassTransit 消息类型不得为系统类型例外

问题描述 投票:0回答:2

我对 MassTransit 还很陌生,不明白我做错了什么会出现以下异常:

Messages types must not be System types

这是我的定义:

[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
    public Guid CorrelationId { get; set; }

    public string CurrentState { get; set; }

    public int Version { get; set; }

    public Guid ActivationId { get; set; }
}

public static class MessageContracts
{
    static bool _initialized;

    public static void Initialize()
    {
        if (_initialized)
            return;

        GlobalTopology.Send.UseCorrelationId<StartProcessingMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<ReconstructionFinishedMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<ProcessingFinishedMessage>(x => x.ActivationId);

        _initialized = true;
    }
}

我的 2 个消费者是:

public class StartReconstructionConsumer : IConsumer<StartProcessingMessage>
{
    readonly ILogger<StartReconstructionConsumer> _Logger;

    private readonly int _DelaySeconds = 5;

    public StartReconstructionConsumer(ILogger<StartReconstructionConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<StartProcessingMessage> context)
    {
        var activationId = context.Message.ActivationId;

        _Logger.LogInformation($"Received Scan: {activationId}");

        await Task.Delay(_DelaySeconds * 1000);

        _Logger.LogInformation($"Finish Scan: {activationId}");

        await context.Publish<ReconstructionFinishedMessage>(new { ActivationId = activationId });
    }
}

public class ProcessingFinishedConsumer : IConsumer<ProcessingFinishedMessage>
{
    readonly ILogger<ProcessingFinishedConsumer> _Logger;

    public ProcessingFinishedConsumer(ILogger<ProcessingFinishedConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<ProcessingFinishedMessage> context)
    {
        _Logger.LogInformation($"Finish {context.Message.ActivationId}");

        await Task.CompletedTask;
    }
}

这是状态机定义:

public class ArcStateMachine: MassTransitStateMachine<ArcProcess>
{
    static ArcStateMachine()
    {
        MessageContracts.Initialize();
    }

    public ArcStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Initially(
            When(ProcessingStartedEvent)
            .Then(context =>
            {
                Console.WriteLine(">> ProcessingStartedEvent");
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(ProcessingStartedState));

        During(ProcessingStartedState,
            When(ReconstructionFinishedEvent)
            .Then(context =>
            {
                Console.WriteLine(">> ReconstructionFinishedEvent");
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .Publish(context =>
            {
                return context.Init<ProcessingFinishedMessage>(new { ActivationId = context.Data.ActivationId });
            })
            .TransitionTo(ProcessingFinishedState)
            .Finalize());
    }

    public State ProcessingStartedState { get; }
    public State ReconstructionStartedState { get; }
    public State ReconstructionFinishedState { get; }
    public State ProcessingFinishedState { get; }

    public Event<StartProcessingMessage> ProcessingStartedEvent { get; }
    public Event<ReconstructionStartedMessage> ReconstructionStartedEvent { get; }
    public Event<ReconstructionFinishedMessage> ReconstructionFinishedEvent { get; }
    public Event<ProcessingFinishedMessage> ProcessingFinishedEvent { get; }
}

MassTransit 的设置如下所示:

var rabbitHost = Configuration["RABBIT_MQ_HOST"];

        if (rabbitHost.IsNotEmpty())
        {
            services.AddMassTransit(cnf =>
            {
                var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];

                var machine = new ArcStateMachine();
                var repository = MongoDbSagaRepository<ArcProcess>.Create(connectionString,
                    "mongoRepo", "WorkflowState");

                cnf.AddConsumer(typeof(StartReconstructionConsumer));
                cnf.AddConsumer(typeof(ProcessingFinishedConsumer));

                cnf.UsingRabbitMq((context, cfg) =>
                {
                    cfg.Host(new Uri(rabbitHost), hst =>
                    {
                        hst.Username("guest");
                        hst.Password("guest");
                    });

                    cfg.ConfigureEndpoints(context);

                    cfg.ReceiveEndpoint(BusConstants.SagaQueue,
                        e => e.StateMachineSaga(machine, repository));
                });
            });

            services.AddMassTransitHostedService();

            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
            });
        }

我有几个问题:

  1. 实际上何时通过发布消息来发布事件? IE。在我的示例中

    await _BusInstance.Bus.Publish<StartProcessingMessage>(new { ActivationId = id });
    是从 WebApi 调用的,该 WebApi 被
    StartReconstructionConsumer
    使用,但实际上状态机开始与
    Initially(When(ProcessingStartedEvent)...
    一起工作?

  2. 我的处理应确保我已经处于

    ProcessingStartedState
    状态,以便
    During(ProcessingStartedState, When(ReconstructionFinishedEvent)...
    正确行动。那么,我如何确保在收到
    StartProcessingMessage
    后触发的消费者可以发布应启动该
    ReconstructionFinishedMessage
    During
    ?我是否正确构建了消息交换?

  3. 目前,对于

    await context.Publish<ReconstructionFinishedMessage>(new { ActivationId = activationId });
    ,我在日志中遇到异常,指出
    R-FAULT rabbitmq://localhost/saga.service d4070000-7b3b-704d-0f10-08d99942c959 Nanox.GC.Shared.AppCore.Messages.ReconstructionFinishedMessage ReconCaller.Saga.ArcProcess(00:00:04.1132604)
    ,而消息中的 GUID 实际上是
    MessageId
    。我在rabbitmq中的消息被路由到
    saga.service_error
    ,但有一个例外是
    Messages types must not be System types: System.Threading.Tasks.Task<Nanox.GC.Shared.AppCore.Messages.ProcessingFinishedMessage> (Parameter 'T')

看来我真的很想念这里..

我的目的是启动处理,该处理将由几个消费者按顺序处理多个阶段。所以在这里我尝试构建一个简单的状态机,每当有人调用

StartProcessing
时就会启动,然后每个消费者都会完成其工作并触发
FinishedStepX
,这会将状态机提升到新的步骤并启动下一个消费者,直到所有处理完成,状态机将报告
ProcessingComplete

感谢您的帮助

c# masstransit
2个回答
1
投票

首先,您的总线配置有点奇怪,所以我已经清理了它:

services.AddMassTransit(cnf =>
{
    var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];

    cfg.AddSagaStateMachine<ArcStateMachine, ArcProcess>()
        .Endpoint(e => e.Name = BusConstants.SagaQueue)
        .MongoDbRepository(connectionString, r =>
        {
            r.DatabaseName = "mongoRepo";
            r.CollectionName = "WorkflowState";
        });

    cnf.AddConsumer<StartReconstructionConsumer>();
    cnf.AddConsumer<ProcessingFinishedConsumer>();

    cnf.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(new Uri(rabbitHost), hst =>
        {
            hst.Username("guest");
            hst.Password("guest");
        });

        cfg.ConfigureEndpoints(context);
    });
});

发布问题与所使用的方法有关,只有PublishAsync允许使用消息初始值设定项:

During(ProcessingStartedState,
    When(ReconstructionFinishedEvent)
        .Then(context =>
        {
            Console.WriteLine(">> ReconstructionFinishedEvent");
            context.Instance.ActivationId = context.Data.ActivationId;
        })
        .PublishAsync(context =>
        {
            return context.Init<ProcessingFinishedMessage>(new { ActivationId = context.Data.ActivationId });
        })
        .TransitionTo(ProcessingFinishedState)
        .Finalize());

这应该可以解决你的问题。


1
投票

在 @Chris Patterson 的慷慨帮助下,可行的解决方案是:

定义:

[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
    public Guid CorrelationId { get; set; }

    public string CurrentState { get; set; }

    public int Version { get; set; }

    public Guid ActivationId { get; set; }
}

public interface StartProcessingMessage
{
    Guid ActivationId { get; }
}

public interface ProcessingFinishedMessage
{
    Guid ActivationId { get; }
}

public static class MessageContracts
{
    static bool _initialized;

    public static void Initialize()
    {
        if (_initialized)
            return;

        GlobalTopology.Send.UseCorrelationId<StartProcessingMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<ProcessingFinishedMessage>(x => x.ActivationId);

        _initialized = true;
    }
}

消费者:

public class StartProcessingConsumer : IConsumer<StartProcessingMessage>
{
    readonly ILogger<StartProcessingConsumer> _Logger;

    private readonly int _DelaySeconds = 5;

    public StartProcessingConsumer(ILogger<StartProcessingConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<StartProcessingMessage> context)
    {
        var activationId = context.Message.ActivationId;

        _Logger.LogInformation($"Received Scan: {activationId}");

        await Task.Delay(_DelaySeconds * 1000);

        _Logger.LogInformation($"Finish Scan: {activationId}");

        await context.Publish<ProcessingFinishedMessage>(new { ActivationId = activationId });
    }
}

public class ProcessingFinishedConsumer : IConsumer<ProcessingFinishedMessage>
{
    readonly ILogger<ProcessingFinishedConsumer> _Logger;

    public ProcessingFinishedConsumer(ILogger<ProcessingFinishedConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<ProcessingFinishedMessage> context)
    {
        _Logger.LogInformation($"Finish {context.Message.ActivationId}");

        await Task.CompletedTask;
    }
}

状态机定义:

public class ArcStateMachine: MassTransitStateMachine<ArcProcess>
{
    static ArcStateMachine()
    {
        MessageContracts.Initialize();
    }

    public ArcStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Initially(
            When(ProcessingStartedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(ProcessingStartedState));

        During(ProcessingStartedState,
            When(ProcessingFinishedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .Finalize());
    }

    public State ProcessingStartedState { get; }
    public State ProcessingFinishedState { get; }

    public Event<StartProcessingMessage> ProcessingStartedEvent { get; }
    public Event<ProcessingFinishedMessage> ProcessingFinishedEvent { get; }
}

大众交通设置:

var rabbitHost = Configuration["RABBIT_MQ_HOST"];

        if (rabbitHost.IsNotEmpty())
        {
            services.AddMassTransit(cnf =>
            {
                var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];

                cnf.AddSagaStateMachine<ArcStateMachine, ArcProcess>()
                    .Endpoint(e => e.Name = BusConstants.SagaQueue)
                    .MongoDbRepository(connectionString, r =>
                    {
                        r.DatabaseName = "mongoRepo";
                        r.CollectionName = "WorkflowState";
                    });


                cnf.AddConsumer(typeof(StartProcessingConsumer));
                cnf.AddConsumer(typeof(ProcessingFinishedConsumer));

                cnf.UsingRabbitMq((context, cfg) =>
                {
                    cfg.Host(new Uri(rabbitHost), hst =>
                    {
                        hst.Username("guest");
                        hst.Password("guest");
                    });

                    cfg.ConfigureEndpoints(context);
                });
            });

            services.AddMassTransitHostedService();

            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
            });
        }

这个例子对我理解 MassTrasit 的基础知识如何工作有很大帮助。

© www.soinside.com 2019 - 2024. All rights reserved.