如何异步处理子进程的I / O? [重复]

问题描述 投票:0回答:1

我有一个子进程,可能会或可能不会在特定的时间内向它的stdout写一些东西,例如3秒。

如果子进程标准输出中的新行以正确的东西开头,我想返回该行。最好我想实现这样的事情:

use std::io::{BufRead, BufReader};
use std::thread;
use std::time::Duration;

pub fn wait_for_or_exit(
    reader: &BufReader<&mut std::process::ChildStdout>,
    wait_time: u64,
    cmd: &str,
) -> Option<String> {
    let signal: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
    let signal_clone = signal.clone();
    let child = thread::spawn(move || {
        thread::sleep(Duration::from_millis(wait_time));
        signal_clone.store(true, Ordering::Relaxed);
    });
    let mut line = String::new();
    while !signal.load(Ordering::Relaxed) {
        //Sleep a really small amount of time not to block cpu
        thread::sleep(Duration::from_millis(10));
        //This line is obviously invalid!
        if reader.has_input() {
            line.clear();
            reader.read_line(&mut line).unwrap();
            if line.starts_with(cmd) {
                return Some(line);
            }
        }
    }
    None
}

唯一没有在这里工作的是reader.has_input()

显然,如果子进程的响应速度比wait_time快了很多次,那么就会有很多睡眠线程,但我可以用通道来处理它。

asynchronous rust subprocess
1个回答
1
投票

有两种方法。

  1. 您可以启动一个单独的线程,然后使用某种机制(可能是一个通道)来向您的等待线程发出成功或失败的信号。
  2. 您可以使用您提到的异步IO,例如期货和tokio lib。

我会演示两个。我更喜欢期货/ Tokio方法,但如果你不熟悉期货模型,那么期权一可能会更好。

Rust stdlib有一个Channels API,这个频道实际上有一个recv_timeout,可以帮助我们相当多。

use std::thread;
use std::time::Duration;
use std::sync::mpsc;

// this spins up a separate thread in which to wait for stuff to read
// from the BufReader<ChildStdout> 
// If we successfully read, we send the string over the Channel.
// Back in the original thread, we wait for an answer over the channel
// or timeout in wait_time secs. 
pub fn wait_for_or_exit(
    reader: &BufReader<&mut std::process::ChildStdout>,
    wait_time: u64,
    cmd: &str,
) -> Option<String> {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        let line = reader.read_line();
        sender.send(line);
    });

    match receiver.recv_timeout(Duration::from_secs(wait_time)) {
        Ok(line) => if line.starts_with(cmd) 
           { Some(line) } else 
           { None },
        Err(mpsc::RecvTimeoutError::Timeout) => None,
        Err(mpsc::RecvTimeoutError::Disconnected) => None  

    }
}

选项二假设您正在构建基于未来的应用程序。为了实现你想要的东西使用Async IO是一个文件描述符,让我们设置NON_BLOCKING。幸运的是,我们自己不必这样做。 Futures和Tokio API很好地处理了这个问题。权衡的是,你必须用非阻塞期货来编写你的代码。

下面的代码几乎全部来自Tokio Process,期货超时来自Tokio API。

extern crate futures;
extern crate tokio;
extern crate tokio_process;

use std::process::Command;
use std::time::{Duration};

use futures::Future;
use tokio_process::CommandExt;
use tokio::prelude::*;

const TIMEOUT_SECS: u64 = 3;

fn main() {
    // Like above, but use `output_async` which returns a future instead of
    // immediately returning the `Child`.
    let output = Command::new("echo").arg("hello").arg("world")
                        .output_async();

    let future = output.map_err(|e| panic!("failed to collect output: {}", e))
        .map(|output| {
            assert!(output.status.success());
            assert_eq!(output.stdout, b"hello world\n");
            println!("received output: {}",     String::from_utf8(output.stdout).unwrap());
        })
        .timeout(Duration::from_secs(TIMEOUT_SECS)) // here is where we say we only want to wait TIMETOUT seconds
        .map_err(|_e| { println!("Timed out waiting for data"); });

    tokio::run(future);
}
© www.soinside.com 2019 - 2024. All rights reserved.