我正在使用 Rust 和
russh
板条箱来实现 SSH 隧道,以便通过本地侦听端口访问远程服务器。
这是我当前的实现代码:
let mut ssh_client= russh::client::connect(
Arc::new(Config::default()),
format!("{}:{}", self.host, self.port),
IHandler {},
).await?;
ssh_client.authenticate_password(self.username.clone(), self.password.clone()).await?;
let listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)).await?;
let addr = listener.local_addr()?;
let channel = ssh_client.channel_open_direct_tcpip(
self.forwarding_host.clone(),
self.forwarding_port as u32,
Ipv4Addr::LOCALHOST.to_string(),
addr.port() as u32,
).await?;
let mut remote_stream = channel.into_stream();
tokio::spawn(async move {
loop {
if let Ok((mut local_stream, _)) = listener.accept().await {
tokio::spawn(async move {
select! {
result = tokio::io::copy_bidirectional_with_sizes(&mut local_stream, &mut remote_stream, 255, 8 * 1024) => {
// close
}
});
}
if rx_clone.changed().await.is_ok() {
break;
}
}
drop(listener);
Ok::<(), Error>(())
});
上面的代码通过本地 TcpListener 端口成功建立了 SSH 隧道,但它有一个限制:客户端只能创建一个到该端口的连接,后续的连接尝试将被阻止。理想情况下,我想支持多个连接(到 TcpListener)。
russh = "0.45.0"
tokio = "1"
有没有办法让russh支持多连接?或者如何修改代码来实现这个需求?
单个 SSH 隧道支持多个连接。
根据@maxy的建议
每次连接都会调用
。channel_open_direct_tcpip()
我将打开通道的行为从 SSH 客户端移至 TCP 侦听器的接受代码块中。另外,我将退出信号监听方法从
rx_clone.changed().await.is_ok()
修改为rx_clone.has_changed()?
。
最终代码如下:
use anyhow::{Error, Result};
use async_trait::async_trait;
use log::{error, info, warn};
use russh::client::{Config, Handler, Msg, Session};
use russh::keys::key;
use russh::{Channel, ChannelId, Disconnect};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::select;
#[derive(Clone, Debug)]
pub struct SshTunnel {
pub host: String,
pub port: u16,
pub username: String,
pub password: String,
pub forwarding_host: String,
pub forwarding_port: u16,
tx: tokio::sync::watch::Sender<u8>,
rx: tokio::sync::watch::Receiver<u8>,
is_connected: bool,
}
impl SshTunnel {
pub fn new(host: String, port: u16, username: String, password: String, forwarding_host: String, forwarding_port: u16) -> Self {
let (tx, rx) = tokio::sync::watch::channel::<u8>(1);
Self {
host,
port,
username,
password,
forwarding_host,
forwarding_port,
tx,
rx,
is_connected: false,
}
}
pub async fn open(&mut self) -> Result<SocketAddr> {
let mut ssh_client = russh::client::connect(
Arc::new(Config::default()),
format!("{}:{}", self.host, self.port),
IHandler {},
).await?;
ssh_client.authenticate_password(self.username.clone(), self.password.clone()).await?;
let listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)).await?;
let addr = listener.local_addr()?;
let forwarding_host = self.forwarding_host.clone();
let forwarding_port = self.forwarding_port as u32;
let mut rx_clone = self.rx.clone();
tokio::spawn(async move {
loop {
let mut rx_clone_clone = rx_clone.clone();
if let Ok((mut local_stream, _)) = listener.accept().await {
let channel = ssh_client.channel_open_direct_tcpip(
forwarding_host.clone(),
forwarding_port,
Ipv4Addr::LOCALHOST.to_string(),
addr.port() as u32,
).await?;
let mut remote_stream = channel.into_stream();
tokio::spawn(async move {
select! {
result = tokio::io::copy_bidirectional_with_sizes(&mut local_stream, &mut remote_stream, 255, 8 * 1024) => {
if let Err(e) = result {
error!("Error during bidirectional copy: {}", e);
}
warn!("Bidirectional copy stopped");
}
_ = rx_clone_clone.changed() => {
info!("Received close signal");
}
}
let _ = remote_stream.shutdown().await;
});
}
if rx_clone.has_changed()? {
ssh_client.disconnect(Disconnect::ByApplication, "exit", "none").await?;
break;
}
}
drop(listener);
info!("Stream closed");
Ok::<(), Error>(())
});
self.is_connected = true;
Ok(addr)
}
pub async fn close(&mut self) -> Result<()> {
self.tx.send(0)?;
self.is_connected = false;
Ok(())
}
pub fn is_connected(&self) -> bool {
self.is_connected
}
}
struct IHandler;
#[async_trait]
impl Handler for IHandler {
type Error = Error;
async fn check_server_key(&mut self, _: &key::PublicKey) -> Result<bool, Self::Error> {
Ok(true)
}
}