我正在向比特币服务器写入 PoC P2P 节点握手。 我向目标节点发送一条“版本”消息,目标节点会使用相应的版本消息进行响应。到目前为止一切顺利。
但是,来自目标节点的版本消息有时可能需要两分钟以上才能到达。服务器最终确实给出了成功的响应,但由于我已对其他网络活动应用了 5 秒超时,我认为为了保持一致性,我应该在这里执行相同的操作。
这是没有任何超时检查的代码。这很有效,尽管有时非常慢:
use bitcoin::{
consensus::Decodable,
p2p::message::{self},
};
use std::{
io::BufReader,
net::{IpAddr, SocketAddr, TcpStream},
time::{Duration, SystemTime},
};
pub async fn handshake(to_address: IpAddr, port: u16, timeout: u64) -> Result<()> {
let target_node = SocketAddr::new(to_address, port);
let mut write_stream = TcpStream::connect_timeout(&target_node, Duration::from_secs(timeout))?;
// Build and send version message
send_msg_version(&target_node, &mut write_stream)?;
let read_stream = write_stream.try_clone()?;
let mut stream_reader = BufReader::new(read_stream);
let response1 = message::RawNetworkMessage::consensus_decode(&mut stream_reader)?;
match response1.payload() {
// Do stuff here
}
// More stuff here...
}
对
consensus_decode()
的调用有时需要很长时间,因此我尝试将其包装在 future::ready()
中,然后将其放入对 tokio::time::timeout()
的调用中,如下所示:
let response1 = if let Ok(net_msg) = tokio::time::timeout(
Duration::from_secs(timeout),
future::ready(message::RawNetworkMessage::consensus_decode(&mut stream_reader)?),
)
.await
{
net_msg
} else {
return Err(CustomError(format!(
"TIMEOUT: {} failed to respond with VERSION message within {} seconds",
to_address, timeout
)));
};
(&mut stream_reader)?
部分成功捕获任何解码错误,但如果消息成功到达,但速度太慢,tokio::time::timeout
无法捕获它。
我在这里缺少什么?
谢谢
您不能使用
future::ready
使同步函数调用异步,请使用 tokio::task::spawn_blocking
代替:
use std::time::Duration;
fn takes_long() {
std::thread::sleep(Duration::from_millis(100));
}
#[tokio::main]
async fn main() {
let x = tokio::time::timeout(Duration::from_millis(10), tokio::task::spawn_blocking(|| {
takes_long();
})).await;
eprintln!("{x:?}");
}
产生以下输出:
Err(Elapsed(()))
您的方法的问题在于,在调用
future::ready
之前(更不用说稍后调用 tokio::time::timeout
),函数 consensus_decode
必须已经返回并产生一个值,此时 future::ready
可以立即返回并且您的超时永远不会触发。