tokio-runtime-worker 哈希函数中的堆栈溢出

问题描述 投票:0回答:1

我正在编写一个小应用程序,用于根除重复文件(并练习我的 Rust,我对此相对较新!)。

作为学习的一部分(并且因为我有媒体管理背景,涉及非常大的文件),我决心让它在我的机器的核心上高效执行。 为此,我使用 Tokio 运行时箱和

deadqueue
作为我的队列实现。

所以,我的工作线程代码如下所示(

worker_count
当前已初始化为
num_cores
给出的核心计数,可以通过命令行选项进行自定义):

    println!("Starting up {} worker threads...", worker_count);
    for n in 0..worker_count {
        let rxq = queue.clone();
        tokio::spawn(async move {
            eprintln!("INFO Thread {}", n);
            let _ = io::stderr().flush();
            loop {
                let ff = rxq.pop().await;
                eprintln!("Popped {}", ff.fullpath().display());
                
                process_item(ff).await;
            }
        });
    }

process_item 函数的定义如下:

async fn process_item(mut item:FoundFile) -> () {
    match item.calculate_sha().await {
        Ok(_)=>{}
        Err(e)=>{
            println!("ERROR Could not checksum: {}", e)
        }
    }
    ()
}

(有一个使用

spawn_blocking
创建的单独线程,它执行异步扫描并将
FoundFile
对象推送到队列中)

calculate_sha()
定义在
impl FoundFile
块中:

    pub async fn calculate_sha(&mut self) -> Result<String, Box<dyn Error> > {
        let mut file = tokio::fs::File::open(self.fullpath()).await?;
        let mut hasher = Box::new(Sha256::new());

        //let mut buffer: [u8; BUFFER_SIZE] = [0; BUFFER_SIZE];
        let mut buffer = vec![0_u8; BUFFER_SIZE];

        eprintln!("SHA calculation on {}", self.fullpath().display());


        while file.read(&mut buffer).await? > 0 {
            hasher.update(&buffer);
        }

        let final_result = encode(hasher.finalize());
        self.sha = Some(final_result.clone());
        Ok( final_result )
    }

这与我在 C、C++、Scala 等中所做的基本相同 - 打开文件,创建哈希器,为 1 个块设置本地缓冲区,然后重复从文件填充缓冲区并推入哈希器(更新哈希器状态)直到我们用完文件中的数据。 这样我的内存需求应该限制在 1 个块和哈希器状态的开销。 RAII 应该确保一切都得到清理。 (顺便说一句,

encode
函数是
hex::encode
- 与这里无关!)

编译器可以接受所有这些,但是当我实际尝试运行它时,我发现毫无帮助:

Starting up 8 worker threads...

thread 'tokio-runtime-worker' has overflowed its stack
fatal runtime error: stack overflow

注释掉

file.read() / hasher.update()
块意味着它运行良好(当然,实际上没有做任何事情!)。

不过,我不知道是什么导致了溢出。 我没有递归任何内容,只是使用本地缓冲区重复调用 hasher.update 。 正如您从上面的代码中看到的,我尝试分别使用

hasher
buffer
来确保
Box
Vec
都在堆上 - 并确保
buffer
已初始化 - 但没有其他比注释掉循环似乎有任何区别。

最奇怪的是,当我使用

eprintln!
语句跟踪执行时(假设 STDERR 输出没有被缓冲,就像我使用的其他语言一样),那么我什至看不到
INFO Thread {n}
行。

显然在幕后发生了一些更微妙的事情,但我对它是什么有点茫然——有比我更了解 Tokio 的人可以解释一下吗?

非常感谢!

multithreading rust rust-tokio
1个回答
0
投票

您的代码中没有无限的堆栈使用,它只是需要更大的堆栈。

没有一种可移植的方法来增加主线程的堆栈大小(尽管可以使用链接器参数),但对于生成的线程,Rust 提供了

std::thread::Builder::stack_size()

但是,您的线程是

tokio
工作人员,因此您需要另一种方式。幸运的是,
tokio
还提供了一种配置工作线程堆栈大小的方法,在构建运行时时使用
tokio::runtime::Builder::thread_stack_size()

另请注意,优化的构建通常使用较少的堆栈存储,并且您的代码可以在不修改的情况下使用它们。

© www.soinside.com 2019 - 2024. All rights reserved.