成功发起请求后无法获得 MassTransit 响应

问题描述 投票:0回答:2

我正在使用 MassTransit 和 Amazon MQ 实施请求/响应。 (.NET 6 WebApi) 主机配置如下:

services
    .AddMassTransit(x =>
    {
        x.AddConsumer<ConfigurationConsumer>();
        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host(new Uri($"amqps://{rabbitMqSettings.Host}:{rabbitMqSettings.Port}/{Uri.EscapeDataString("/vhostname")}"), h =>
            {
                h.Username(rabbitMqSettings.Username);
                h.Password(rabbitMqSettings.Password);
            });
            cfg.ConfigureEndpoints(context);
        });
        x.AddRequestClient<IConfigurationCommand>();
    })
    .AddMassTransitHostedService();

客户端使用相同的配置,无需 x.AddConsumer() 和 cfg.ConfigureEndpoints(context)。

问题是:在客户端调用 GetResponse 将成功执行主机上的消费者,该消费者似乎按预期响应,但客户端永远不会得到结果并超时。

另一方面,当我在同一主机上使用 Web api 控制器来启动 GetResponse 时,一切都按预期工作。

.net-core masstransit
2个回答
1
投票

确保主机和客户端应用程序之间的消息类型相同。

另外,请确保使用托管服务或其他方式在客户端上启动总线。


0
投票

在我的响应类型上使用继承类时,我遇到了同样的超时问题,其中父类有一个失败响应的方法!

最后,当将响应强制转换为子类时,问题得到了解决。

MyCommon-lib:

public partial interface IEvent
{
    bool Succeeded { get; }
    string Message { get; }
}
public abstract record Event<TModel> : IEvent
    where TModel : IEvent
{
    public bool Succeeded { get; set; } = true;
    public string Message { get; set; } = string.Empty;

    public TModel Failed(Exception exception)
    {
        Succeeded = false;
        Message = exception.InnerException?.Message ?? exception.Message;
        return (TModel)(object)this; //! problem was here, where I cannot return this as simple as 'return this;'
    }
    public TModel Failed(string message)
    {
        Succeeded = false;
        Message = message;
        return (TModel)(object)this;
    }
}

public partial interface IEvent
{
    public record PointHistory(IList<PointHistoryDto>? Group, IEnumerable<PointHistoryDto>? Filtered) : Event<PointHistory>, IEvent;
    public record PointApplied() : Event<PointApplied>, IEvent;
}

微服务1::MyMediator:

public async Task DoSomething()
{
    ...
    var pointHistory = await bus.Request<ICommand.PointHistory, IEvent.PointHistory>( ... );
    if (!pointHistory.Message.Succeeded) throw new UserException(pointHistory.Message.Message);
    ...
    var pointApplied = await bus.Request<ICommand.PointApply, IEvent.PointApplied>( ... );
    if (!pointHistory.Message.Succeeded) throw new UserException(pointHistory.Message.Message);
    ...
}

微服务2::我的消费者1:

internal class MyConsumer1(
    IPointService service
) : IConsumer<ICommand.PointApply>
{
    public async Task Consume(ConsumeContext<ICommand.PointApply> context)
    {
        try
        {
            ICommand.PointApply request = context.Message;
            
            ...
            
            //! response
            await context.RespondAsync(new IEvent.PointApplied());
        }
        catch (UserException err) { await context.RespondAsync(new IEvent.PointApplied().Failed(err)); }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.