简而言之,我需要从 C# 客户端发送的 TCP 连接中读取字符串。客户端使用
BinaryWriter
,它以 leb128 格式的长度作为实际字符串的前缀。我在 Rust 中使用 tokio::net::TcpStream
并搜索了一个箱子来帮助我从流中检索该长度前缀,但我找不到任何合适的东西。大多数解决方案需要您正在阅读的源代码来实现 io::Read
特征,但 tokio::net::TcpStream
并没有实现它。
我设法让它与这个丑陋的代码一起工作,但我从一开始就对此表示怀疑。我最近发现它有时会导致某种竞争条件。我不完全确定,但我认为它在
let file_name_len = leb128::read::unsigned(&mut socket)?;
上被阻止,这以某种方式导致我的 TCP 侦听器停止接受新连接,这更奇怪。
let mut socket = socket.into_std()?;
socket.set_nonblocking(false)?;
let file_name_len = leb128::read::unsigned(&mut socket)?;
let mut socket = tokio::net::TcpStream::from_std(socket)?;
有人知道正确的方法吗?
我不完全确定,但我认为它在
上被阻止,这不知何故导致我的 TCP 侦听器停止接受新连接,这更奇怪。let file_name_len = leb128::read::unsigned(&mut socket)?;
上面的代码是阻塞:
set_nonblocking(false)
)。leb128::read::unsigned(&mut socket)?;
。这将阻塞整个 tokio 线程。
它不应该阻止TCP侦听器如果TCP侦听器在单独的任务中运行并且您正在使用(默认)多线程tokio运行时......当然除非您有多个这样的LEB任务阻塞每个 tokio 线程。
不幸的是,异步读取没有标准 API,并且
leb128
crate 不提供任何 tokio 集成,因此需要做一些工作。
不过,不会太多,因为
&[u8]
实现了 Read
,并且在 Read
之后,切片将被更新为指向未读字节。
由于您使用的是 TCP,我假设您已经为收到的字节准备了某种缓冲区——将它们传递给解码器——所以您应该只使用该缓冲区。
// Read until sufficient bytes are obtained to determine length.
let length = loop {
const UNEXPECTED_EOF: io::ErrorKind = io::ErrorKind::UnexpectedEof;
let mut slice = buffer.readable();
let result = leb128::read::unsigned(&mut slice);
match result {
Ok(length) => {
let consumed = buffer.readable().len() - slice.len();
buffer.advance(consumed);
break length;
},
Err(leb128::read::Error::IoError(e)) if e.kind() == UNEXPECTED_EOF =>
continue,
Err(e) => return Err(e.into()),
}
socket.readable().await?;
let length = socket.try_read(buffer.writable())?;
todo!("Handle length, beware 0 means EOF");
};
// Do something with buffered bytes, perhaps waiting for more
// (now that you know how many you need).
todo!();
不过,我觉得上面的代码结构不是很好。混合异步 I/O 和解码意味着您无法单独测试解码,这很痛苦,我真的建议尽可能选择 Sans IO 设计。
相反,我鼓励您编写一个
Framer
或 Decoder
来处理部分(或全部)解码逻辑,并将 I/O 与成帧/解码完全分开。
这个想法相对简单:将字节推入其中,获取帧字节或解码消息。
由于我没有您的解码器,因此我将使用成帧器,其作用是隔离流中的单个帧(编码消息)。
一旦你有了一个成帧器,它实际上相对简单:
socket.readable().await?;
let mut buffer = [0; 16 * 1024];
let length = socket.try_read(&mut buffer)?;
if length == 0 {
todo!("handle EOF");
}
framer.push(&buffer[..length]);
// May want to limit the number of frames handled at once, to avoid blocking
// other clients.
while let Some(message) = framer.pop()? {
todo!("handle message");
}
而且非常重要的是,很容易测试成帧器可以处理各种消息。
实际的成帧器代码相对简单:
// Disclaimer: I have not even _compiled_ this code, don't expect it to handle
// all the edge cases.
#[derive(Clone, Debug, Default)]
pub struct Framer {
state: FramerState,
consumed: usize,
buffer: Vec<u8>,
}
impl Framer {
/// Constructs a framer with a specific capacity.
pub fn with_capacity(capacity: usize) -> Self {
let state = FramerState::default();
let consumed = 0;
let buffer = Vec::with_capacity(capacity);
Self { state, consumed, buffer }
}
/// Pushes bytes into the framer.
pub fn push(&mut self, bytes: &[u8]) {
// Trick here: draining in push is easier, and avoids O(N²) pop.
if self.consumed > 0 {
self.buffer.drain(..self.consumed);
}
self.buffer.extend_from_slice(bytes);
}
/// Pops a frame, if possible.
///
/// Call repeatedly to pop all buffered frames, when no complete frame is
/// buffered any longer, returns `Ok(None)`.
///
/// Returns an error if the underlying stream is faulty, for example has
/// an overflowing length.
pub fn pop(&mut self) -> Result<Option<&[u8]>, Error> {
match self.state {
FramerState::WaitingForLength => {
let length = self.pop_length()?;
let Some(length) = length else { return Ok(None) };
let Some(length) = NonZeroU64::new(length) else {
return Ok(Some(&[]));
};
// FIXME: may want a maximum length here, regardless of
// overflow, as otherwise a client accidentally
// sending 2^63-1 LEB encoded will lead the server
// to wait forever.
self.state = FramerState::WaitingForMessage(length);
self.pop_message(length)
}
FramerState::WaitingForMessage(length) => {
self.pop_message(length)
}
}
}
// Pops the length.
//
// # Panics
//
// In Debug, if called when state is not WaitingForLength.
fn pop_length(&mut self) -> Result<Option<u64>, Error> {
const UNEXPECTED_EOF: io::ErrorKind = io::ErrorKind::UnexpectedEof;
debug_assert_eq!(FramerState::WaitingForLength, self.state);
let mut available = &self.buffer[self.consumed..];
match leb128::read::unsigned(&mut available) {
Ok(length) => {
let consumed = self.buffer.len() - self.consumed - available.len();
self.consumed += consumed;
Ok(Some(length))
},
Err(leb128::read::Error::IoError(e)) if e.kind() == UNEXPECTED_EOF => Ok(None),
Err(e) => Err(e.into()),
}
}
// Pops the actual frame, according to the length.
//
// # Panics
//
// In Debug, if called when state is not WaitingForMessage(length).
fn pop_message(&mut self, length: NonZeroU64) -> Result<Option<&[u8]>, Error> {
debug_assert_eq!(FramerState::WaitingForMessage(length), self.state);
let length = length.get().try_into().map_err(|_| Error::Overflow)?;
let Some((frame, _)) = self.buffer[self.consumed..].split_at_checked(length) else {
return Ok(None);
};
self.state = FramerState::WaitingForLength;
self.consumed += frame.len();
Ok(Some(frame))
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
enum FramerState {
#[default]
WaitingForLength,
WaitingForBytes(NonZeroU64),
}