我正在使用公共交通与我的rabbitmq 服务交谈。我想要实现的任务是通过大众运输定期检查与rabbitmq服务的连接。
到目前为止我尝试过并取得的成就:
我正在使用 IBusControl.CheckHealth() 来获取公共交通巴士的状态。然而,如果连接被强制关闭(例如通过rabbitmq管理插件),健康状态不会改变(仍然显示健康)
我正在使用rabbitmq客户端来检查我是否可以连接到该服务。 这允许我检查rabbitmq是否可达,但如果 公共交通使用的连接是健康的。
那么我需要做什么来检查所使用的连接 Masstransit与rabbitmq交换消息? 暴露一个 ASP.NET Core 中的 healthcheck 端点不是我想要的。我想要 检查公共交通所在的同一应用程序中的状态 已配置。
配置
.AddMassTransit( x => {
string? rabbitMqConnectionString = LoadConnectionstring();
(string username, string password, string host, ushort port) = ParseRabbitMqConnectionString( rabbitMqConnectionString );
bool isDevelopmentEnvironment = true; // we assume we are starting from debug
x.UsingRabbitMq( ( context, cfg ) => {
cfg.Host( host, (ushort)port, "/", h => {
h.Username( username );
h.Password( password );
if( isDevelopmentEnvironment )
{
h.RequestedConnectionTimeout( TimeSpan.FromSeconds( 1 ) );
}
} );
if( isDevelopmentEnvironment )
{
// Konfiguriere die Anzahl der Verbindungsversuche
cfg.UseMessageRetry( retryConfig => {
retryConfig.None();
} );
}
} );
}
希望大家可以帮助我。
我最终使用的解决方案是创建一个接收器端点:
// used to supervise if the rabbitmq connection is healthy
cfg.ReceiveEndpoint( "test-queue", e => {
e.UseMessageRetry( r => r.None() );
e.Consumer<HealthMessageConsumer>();
} );
然后发送请求周期检查与rabbitmq的连接是否稳定
private async Task<bool> SendHealthMessage()
{
try
{
ISendEndpoint endpoint = await busControl.GetSendEndpoint( new Uri( "rabbitmq://<host>/test-queue" ) );
await endpoint.Send( new HealthMessage(), new System.Threading.CancellationTokenSource( TimeSpan.FromSeconds( 3 ) ).Token );
return true; // Connection is healthy
}
catch( Exception ex )
{
// Catch any exceptions related to RabbitMQ connection issues
Log.Error( $"Error sending test message: {ex.Message}", ex );
return false; // Connection failed
}
}
注意:使用取消令牌,因为如果消息发送失败,它会在等待发送时挂起
说实话,这是一种解决方法,但它对我有用。我有点困惑为什么大众运输让检查rabbitmq连接变得如此困难,希望在以后的版本中他们能更清楚地说明如何实现这一点。