我正在使用 MassTransit 和 Amazon MQ 实施请求/响应。 (.NET 6 WebApi) 主机配置如下:
services
.AddMassTransit(x =>
{
x.AddConsumer<ConfigurationConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri($"amqps://{rabbitMqSettings.Host}:{rabbitMqSettings.Port}/{Uri.EscapeDataString("/vhostname")}"), h =>
{
h.Username(rabbitMqSettings.Username);
h.Password(rabbitMqSettings.Password);
});
cfg.ConfigureEndpoints(context);
});
x.AddRequestClient<IConfigurationCommand>();
})
.AddMassTransitHostedService();
客户端使用相同的配置,无需 x.AddConsumer() 和 cfg.ConfigureEndpoints(context)。
问题是:在客户端调用 GetResponse 将成功执行主机上的消费者,该消费者似乎按预期响应,但客户端永远不会得到结果并超时。
另一方面,当我在同一主机上使用 Web api 控制器来启动 GetResponse 时,一切都按预期工作。
确保主机和客户端应用程序之间的消息类型相同。
另外,请确保使用托管服务或其他方式在客户端上启动总线。
在我的响应类型上使用继承类时,我遇到了同样的超时问题,其中父类有一个失败响应的方法!
最后,当将响应强制转换为子类时,问题得到了解决。
MyCommon-lib:
public partial interface IEvent
{
bool Succeeded { get; }
string Message { get; }
}
public abstract record Event<TModel> : IEvent
where TModel : IEvent
{
public bool Succeeded { get; set; } = true;
public string Message { get; set; } = string.Empty;
public TModel Failed(Exception exception)
{
Succeeded = false;
Message = exception.InnerException?.Message ?? exception.Message;
return (TModel)(object)this; //! problem was here, where I cannot return this as simple as 'return this;'
}
public TModel Failed(string message)
{
Succeeded = false;
Message = message;
return (TModel)(object)this;
}
}
public partial interface IEvent
{
public record PointHistory(IList<PointHistoryDto>? Group, IEnumerable<PointHistoryDto>? Filtered) : Event<PointHistory>, IEvent;
public record PointApplied() : Event<PointApplied>, IEvent;
}
微服务1::MyMediator:
public async Task DoSomething()
{
...
var pointHistory = await bus.Request<ICommand.PointHistory, IEvent.PointHistory>( ... );
if (!pointHistory.Message.Succeeded) throw new UserException(pointHistory.Message.Message);
...
var pointApplied = await bus.Request<ICommand.PointApply, IEvent.PointApplied>( ... );
if (!pointHistory.Message.Succeeded) throw new UserException(pointHistory.Message.Message);
...
}
微服务2::我的消费者1:
internal class MyConsumer1(
IPointService service
) : IConsumer<ICommand.PointApply>
{
public async Task Consume(ConsumeContext<ICommand.PointApply> context)
{
try
{
ICommand.PointApply request = context.Message;
...
//! response
await context.RespondAsync(new IEvent.PointApplied());
}
catch (UserException err) { await context.RespondAsync(new IEvent.PointApplied().Failed(err)); }
}
}