如何用russh创建支持多连接的SSH隧道?

问题描述 投票:0回答:1

我正在使用 Rust 和

russh
板条箱来实现 SSH 隧道,以便通过本地侦听端口访问远程服务器。

这是我当前的实现代码:

let mut ssh_client= russh::client::connect(
    Arc::new(Config::default()),
    format!("{}:{}", self.host, self.port),
    IHandler {},
).await?;

ssh_client.authenticate_password(self.username.clone(), self.password.clone()).await?;

let listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)).await?;
let addr = listener.local_addr()?;

let channel = ssh_client.channel_open_direct_tcpip(
    self.forwarding_host.clone(),
    self.forwarding_port as u32,
    Ipv4Addr::LOCALHOST.to_string(),
    addr.port() as u32,
).await?;

let mut remote_stream = channel.into_stream();
tokio::spawn(async move {
    loop {
        if let Ok((mut local_stream, _)) = listener.accept().await {
            tokio::spawn(async move {
                select! {
                    result = tokio::io::copy_bidirectional_with_sizes(&mut local_stream, &mut remote_stream, 255, 8 * 1024) => {
                        // close 
                }
            });
        }
        if rx_clone.changed().await.is_ok() {
            break;
        }
    }
    drop(listener);
    Ok::<(), Error>(())
});

问题:

上面的代码通过本地 TcpListener 端口成功建立了 SSH 隧道,但它有一个限制:客户端只能创建一个到该端口的连接,后续的连接尝试将被阻止。理想情况下,我想支持多个连接(到 TcpListener)。

我的环境:

  • russh = "0.45.0"
  • tokio = "1"

有没有办法让russh支持多连接?或者如何修改代码来实现这个需求?

单个 SSH 隧道支持多个连接。

rust ssh-tunnel
1个回答
0
投票

根据@maxy的建议

每次连接都会调用

channel_open_direct_tcpip()

我将打开通道的行为从 SSH 客户端移至 TCP 侦听器的接受代码块中。另外,我将退出信号监听方法从

rx_clone.changed().await.is_ok()
修改为
rx_clone.has_changed()?

最终代码如下:

use anyhow::{Error, Result};
use async_trait::async_trait;
use log::{error, info, warn};
use russh::client::{Config, Handler, Msg, Session};
use russh::keys::key;
use russh::{Channel, ChannelId, Disconnect};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::select;

#[derive(Clone, Debug)]
pub struct SshTunnel {
    pub host: String,
    pub port: u16,
    pub username: String,
    pub password: String,
    pub forwarding_host: String,
    pub forwarding_port: u16,
    tx: tokio::sync::watch::Sender<u8>,
    rx: tokio::sync::watch::Receiver<u8>,
    is_connected: bool,
}

impl SshTunnel {
    pub fn new(host: String, port: u16, username: String, password: String, forwarding_host: String, forwarding_port: u16) -> Self {
        let (tx, rx) = tokio::sync::watch::channel::<u8>(1);
        Self {
            host,
            port,
            username,
            password,
            forwarding_host,
            forwarding_port,
            tx,
            rx,
            is_connected: false,
        }
    }

    pub async fn open(&mut self) -> Result<SocketAddr> {
        let mut ssh_client = russh::client::connect(
            Arc::new(Config::default()),
            format!("{}:{}", self.host, self.port),
            IHandler {},
        ).await?;
        ssh_client.authenticate_password(self.username.clone(), self.password.clone()).await?;
        let listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)).await?;
        let addr = listener.local_addr()?;
        let forwarding_host = self.forwarding_host.clone();
        let forwarding_port = self.forwarding_port as u32;

        let mut rx_clone = self.rx.clone();
        tokio::spawn(async move {
            loop {
                let mut rx_clone_clone = rx_clone.clone();
                if let Ok((mut local_stream, _)) = listener.accept().await {
                    let channel = ssh_client.channel_open_direct_tcpip(
                        forwarding_host.clone(),
                        forwarding_port,
                        Ipv4Addr::LOCALHOST.to_string(),
                        addr.port() as u32,
                    ).await?;
                    let mut remote_stream = channel.into_stream();
                    tokio::spawn(async move {
                        select! {
                            result = tokio::io::copy_bidirectional_with_sizes(&mut local_stream, &mut remote_stream, 255, 8 * 1024) => {
                                if let Err(e) = result {
                                    error!("Error during bidirectional copy: {}", e);
                                }
                                warn!("Bidirectional copy stopped");
                            }
                            _ = rx_clone_clone.changed() => {
                                info!("Received close signal");
                            }
                        }
                        let _ = remote_stream.shutdown().await;
                    });
                }
                if rx_clone.has_changed()? {
                    ssh_client.disconnect(Disconnect::ByApplication, "exit", "none").await?;
                    break;
                }
            }
            drop(listener);
            info!("Stream closed");
            Ok::<(), Error>(())
        });

        self.is_connected = true;
        Ok(addr)
    }

    pub async fn close(&mut self) -> Result<()> {
        self.tx.send(0)?;
        self.is_connected = false;
        Ok(())
    }

    pub fn is_connected(&self) -> bool {
        self.is_connected
    }
}

struct IHandler;

#[async_trait]
impl Handler for IHandler {
    type Error = Error;
    async fn check_server_key(&mut self, _: &key::PublicKey) -> Result<bool, Self::Error> {
        Ok(true)
    }
}
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.