我使用 Rust 的 redis crate。我正在使用集群模式并不断尝试使用无限循环获取某个密钥。然而,当我杀死存储该密钥的主节点时,程序遇到“Broken pipeline (os error 32)”并崩溃。
我希望它继续运行并从新的master(被杀死的master的前一个slave)那里获取密钥。
use std::error::Error;
use redis::Commands;
use redis::cluster::ClusterClient;
fn main() -> Result<(), Box> {
let nodes = vec!["redis://127.0.0.1:7000/","redis://127.0.0.1:7001/", "redis://127.0.0.1:7002/","redis://127.0.0.1:7003/", "redis://127.0.0.1:7004/", "redis://127.0.0.1:7005/"];
let client = ClusterClient::new(nodes)?;
let mut con = client.get_connection()?;
loop {
let a : String =con.get("test")?;
}
Ok(())
}
然后我docker杀死了关键“test”的主节点, 我得到的错误是
发生错误:管道损坏(操作系统错误 32)
提前谢谢您。
使用Redis集群模式时,如果存储特定key的master节点宕机,集群会自动从可用的slave中选举出新的master。但是,您的 Rust 程序需要意识到这一点并妥善处理这种情况。
您可以使用 Rust 中的 redis 和 retry crate 来实现此行为。以下是当由于节点故障而发生连接错误时如何实现重试逻辑:
首先,确保您已将 redis 和重试 crate 添加到您的 Cargo.toml 中:
[dependencies]
redis = "0.17.0"
retry = "3.0.0"
现在,让我们编写一个函数,通过重试逻辑从 Redis 集群获取值:
use redis::{Client, Commands};
use retry::delay::Fixed;
use retry::retry;
fn get_value_with_retry(key: &str, connection_string: &str) -> Result<String, redis::RedisError> {
let client = Client::open(connection_string)?;
let mut conn = client.get_connection()?;
let retry_result = retry(Fixed::from_millis(5000), || {
let value: String = conn.get(key)?;
Ok(value)
});
retry_result
}
在此示例中,我们使用重试箱来包装连接到 Redis 并检索特定键的值的代码。如果发生redis::RedisError(连接错误),重试函数将根据提供的延迟策略(本例中为Fixed::from_millis(5000))重试操作,然后放弃。
现在您可以在主循环中或任何需要访问密钥的地方使用此函数:
fn main() {
loop {
match get_value_with_retry("your_key", "redis://your_redis_cluster") {
Ok(value) => {
println!("Value: {}", value);
// Process the value here
// Break the loop if you want to stop the continuous retrieval
},
Err(error) => {
eprintln!("Error: {:?}", error);
// Handle the error as needed
// e.g., Log the error, wait for a while before retrying, etc.
},
}
}
}
通过此设置,即使 Redis 主节点出现故障,您的程序也应继续运行,并在尝试从新主节点检索密钥时妥善处理连接错误。您可以根据您的具体需求(例如,指数退避、抖动等)通过使用重试箱中的不同延迟实现来调整重试策略。