如何在 Rust 中从 TcpStream 读取 leb128

问题描述 投票:0回答:1

简而言之,我需要从 C# 客户端发送的 TCP 连接中读取字符串。客户端使用

BinaryWriter
,它以 leb128 格式的长度作为实际字符串的前缀。我在 Rust 中使用
tokio::net::TcpStream
并搜索了一个箱子来帮助我从流中检索该长度前缀,但我找不到任何合适的东西。大多数解决方案需要您正在阅读的源代码来实现
io::Read
特征,但
tokio::net::TcpStream
并没有实现它。

我设法让它与这个丑陋的代码一起工作,但我从一开始就对此表示怀疑。我最近发现它有时会导致某种竞争条件。我不完全确定,但我认为它在

let file_name_len = leb128::read::unsigned(&mut socket)?;
上被阻止,这以某种方式导致我的 TCP 侦听器停止接受新连接,这更奇怪。

let mut socket = socket.into_std()?;
socket.set_nonblocking(false)?;

let file_name_len = leb128::read::unsigned(&mut socket)?;

let mut socket = tokio::net::TcpStream::from_std(socket)?;

有人知道正确的方法吗?

sockets network-programming rust tcp protocols
1个回答
0
投票

我不完全确定,但我认为它在

let file_name_len = leb128::read::unsigned(&mut socket)?;
上被阻止,这不知何故导致我的 TCP 侦听器停止接受新连接,这更奇怪。

上面的代码是阻塞

  1. 您将套接字设置为阻塞 (
    set_nonblocking(false)
    )。
  2. 然后封锁
    leb128::read::unsigned(&mut socket)?;

这将阻塞整个 tokio 线程。

不应该阻止TCP侦听器如果TCP侦听器在单独的任务中运行并且您正在使用(默认)多线程tokio运行时......当然除非您有多个这样的LEB任务阻塞每个 tokio 线程。


不幸的是,异步读取没有标准 API,并且

leb128
crate 不提供任何 tokio 集成,因此需要做一些工作。

不过,不会太多,因为

&[u8]
实现了
Read
,并且在
Read
之后,切片将被更新为指向未读字节。

由于您使用的是 TCP,我假设您已经为收到的字节准备了某种缓冲区——将它们传递给解码器——所以您应该只使用该缓冲区

//  Read until sufficient bytes are obtained to determine length.
let length = loop {
    const UNEXPECTED_EOF: io::ErrorKind = io::ErrorKind::UnexpectedEof;

    let mut slice = buffer.readable();

    let result = leb128::read::unsigned(&mut slice);

    match result {
        Ok(length) => {
            let consumed = buffer.readable().len() - slice.len();

            buffer.advance(consumed);

            break length;
        },
        Err(leb128::read::Error::IoError(e)) if e.kind() == UNEXPECTED_EOF =>
            continue,
        Err(e) => return Err(e.into()),
    }

    socket.readable().await?;

    let length = socket.try_read(buffer.writable())?;

    todo!("Handle length, beware 0 means EOF");
};

//  Do something with buffered bytes, perhaps waiting for more
//  (now that you know how many you need).
todo!();

不过,我觉得上面的代码结构不是很好。混合异步 I/O 和解码意味着您无法单独测试解码,这很痛苦,我真的建议尽可能选择 Sans IO 设计。

相反,我鼓励您编写一个

Framer
Decoder
来处理部分(或全部)解码逻辑,并将 I/O 与成帧/解码完全分开。

这个想法相对简单:将字节推入其中,获取帧字节或解码消息。

由于我没有您的解码器,因此我将使用成帧器,其作用是隔离流中的单个帧(编码消息)。

一旦你有了一个成帧器,它实际上相对简单:

socket.readable().await?;

let mut buffer = [0; 16 * 1024];
let length = socket.try_read(&mut buffer)?;

if length == 0 {
    todo!("handle EOF");
}

framer.push(&buffer[..length]);

//  May want to limit the number of frames handled at once, to avoid blocking
//  other clients.

while let Some(message) = framer.pop()? {
    todo!("handle message");
}

而且非常重要的是,很容易测试成帧器可以处理各种消息。

实际的成帧器代码相对简单:

//  Disclaimer: I have not even _compiled_ this code, don't expect it to handle
//              all the edge cases.

#[derive(Clone, Debug, Default)]
pub struct Framer {
    state: FramerState,
    consumed: usize,
    buffer: Vec<u8>,
}

impl Framer {
    /// Constructs a framer with a specific capacity.
    pub fn with_capacity(capacity: usize) -> Self {
        let state = FramerState::default();
        let consumed = 0;
        let buffer = Vec::with_capacity(capacity);

        Self { state, consumed, buffer }
    }

    /// Pushes bytes into the framer.
    pub fn push(&mut self, bytes: &[u8]) {
        //  Trick here: draining in push is easier, and avoids O(N²) pop.
        if self.consumed > 0 {
            self.buffer.drain(..self.consumed);
        }

        self.buffer.extend_from_slice(bytes);
    }

    /// Pops a frame, if possible.
    ///
    /// Call repeatedly to pop all buffered frames, when no complete frame is
    /// buffered any longer, returns `Ok(None)`.
    ///
    /// Returns an error if the underlying stream is faulty, for example has
    /// an overflowing length.
    pub fn pop(&mut self) -> Result<Option<&[u8]>, Error> {
        match self.state {
            FramerState::WaitingForLength => {
                let length = self.pop_length()?;

                let Some(length) = length else { return Ok(None) };

                let Some(length) = NonZeroU64::new(length) else {
                    return Ok(Some(&[]));
                };

                //  FIXME: may want a maximum length here, regardless of
                //         overflow, as otherwise a client accidentally
                //         sending 2^63-1 LEB encoded will lead the server
                //         to wait forever.

                self.state = FramerState::WaitingForMessage(length);

                self.pop_message(length)
            }
            FramerState::WaitingForMessage(length) => {
                self.pop_message(length)
            }
        }
    }

    //  Pops the length.
    //
    //  # Panics
    //
    //  In Debug, if called when state is not WaitingForLength.
    fn pop_length(&mut self) -> Result<Option<u64>, Error> {
        const UNEXPECTED_EOF: io::ErrorKind = io::ErrorKind::UnexpectedEof;

        debug_assert_eq!(FramerState::WaitingForLength, self.state);

        let mut available = &self.buffer[self.consumed..];

        match leb128::read::unsigned(&mut available) {
            Ok(length) => {
                let consumed = self.buffer.len() - self.consumed - available.len();
                self.consumed += consumed;

                Ok(Some(length))
            },
            Err(leb128::read::Error::IoError(e)) if e.kind() == UNEXPECTED_EOF => Ok(None),
            Err(e) => Err(e.into()),
        }
    }

    //  Pops the actual frame, according to the length.
    //
    //  # Panics
    //
    //  In Debug, if called when state is not WaitingForMessage(length).
    fn pop_message(&mut self, length: NonZeroU64) -> Result<Option<&[u8]>, Error> {
        debug_assert_eq!(FramerState::WaitingForMessage(length), self.state);

        let length = length.get().try_into().map_err(|_| Error::Overflow)?;

        let Some((frame, _)) = self.buffer[self.consumed..].split_at_checked(length) else {
            return Ok(None);
        };

        self.state = FramerState::WaitingForLength;
        self.consumed += frame.len();

        Ok(Some(frame))
    }
}

#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
enum FramerState {
    #[default]
    WaitingForLength,
    WaitingForBytes(NonZeroU64),
}
© www.soinside.com 2019 - 2024. All rights reserved.