我正在编写一个小应用程序,用于根除重复文件(并练习我的 Rust,我对此相对较新!)。
作为学习的一部分(并且因为我有媒体管理背景,涉及非常大的文件),我决心让它在我的机器的核心上高效执行。 为此,我使用 Tokio 运行时箱和
deadqueue
作为我的队列实现。
所以,我的工作线程代码如下所示(
worker_count
当前已初始化为num_cores
给出的核心计数,可以通过命令行选项进行自定义):
println!("Starting up {} worker threads...", worker_count);
for n in 0..worker_count {
let rxq = queue.clone();
tokio::spawn(async move {
eprintln!("INFO Thread {}", n);
let _ = io::stderr().flush();
loop {
let ff = rxq.pop().await;
eprintln!("Popped {}", ff.fullpath().display());
process_item(ff).await;
}
});
}
process_item 函数的定义如下:
async fn process_item(mut item:FoundFile) -> () {
match item.calculate_sha().await {
Ok(_)=>{}
Err(e)=>{
println!("ERROR Could not checksum: {}", e)
}
}
()
}
(有一个使用
spawn_blocking
创建的单独线程,它执行异步扫描并将 FoundFile
对象推送到队列中)
calculate_sha()
定义在 impl FoundFile
块中:
pub async fn calculate_sha(&mut self) -> Result<String, Box<dyn Error> > {
let mut file = tokio::fs::File::open(self.fullpath()).await?;
let mut hasher = Box::new(Sha256::new());
//let mut buffer: [u8; BUFFER_SIZE] = [0; BUFFER_SIZE];
let mut buffer = vec![0_u8; BUFFER_SIZE];
eprintln!("SHA calculation on {}", self.fullpath().display());
while file.read(&mut buffer).await? > 0 {
hasher.update(&buffer);
}
let final_result = encode(hasher.finalize());
self.sha = Some(final_result.clone());
Ok( final_result )
}
这与我在 C、C++、Scala 等中所做的基本相同 - 打开文件,创建哈希器,为 1 个块设置本地缓冲区,然后重复从文件填充缓冲区并推入哈希器(更新哈希器状态)直到我们用完文件中的数据。 这样我的内存需求应该限制在 1 个块和哈希器状态的开销。 RAII 应该确保一切都得到清理。 (顺便说一句,
encode
函数是hex::encode
- 与这里无关!)
编译器可以接受所有这些,但是当我实际尝试运行它时,我发现毫无帮助:
Starting up 8 worker threads...
thread 'tokio-runtime-worker' has overflowed its stack
fatal runtime error: stack overflow
注释掉
file.read() / hasher.update()
块意味着它运行良好(当然,实际上没有做任何事情!)。
不过,我不知道是什么导致了溢出。 我没有递归任何内容,只是使用本地缓冲区重复调用 hasher.update 。 正如您从上面的代码中看到的,我尝试分别使用
hasher
和 buffer
来确保 Box
和 Vec
都在堆上 - 并确保 buffer
已初始化 - 但没有其他比注释掉循环似乎有任何区别。
最奇怪的是,当我使用
eprintln!
语句跟踪执行时(假设 STDERR 输出没有被缓冲,就像我使用的其他语言一样),那么我什至看不到 INFO Thread {n}
行。
显然在幕后发生了一些更微妙的事情,但我对它是什么有点茫然——有比我更了解 Tokio 的人可以解释一下吗?
非常感谢!
您的代码中没有无限的堆栈使用,它只是需要更大的堆栈。
没有一种可移植的方法来增加主线程的堆栈大小(尽管可以使用链接器参数),但对于生成的线程,Rust 提供了
std::thread::Builder::stack_size()
。
但是,您的线程是
tokio
工作人员,因此您需要另一种方式。幸运的是,tokio
还提供了一种配置工作线程堆栈大小的方法,在构建运行时时使用tokio::runtime::Builder::thread_stack_size()
。
另请注意,优化的构建通常使用较少的堆栈存储,并且您的代码可以在不修改的情况下使用它们。