当使用来自`fork`创建的多个C线程的回调函数时,Rust Mutex无法正常工作

问题描述 投票:3回答:1

我正在使用C库Cuba,它使用从C中创建的多个线程调用的回调函数。古巴并行化基于fork / wait POSIX函数而不是pthreads(arxiv.org/abs/1408.6373)。它给出了core参数中的当前线程。

我试图将此回调函数的结果记录到屏幕和文件中。如果我使用println!我得到预期的输出,但是如果我使用slog,当我使用Mutex排水时输出会受损。如果我使用async排水管,我根本就没有输出。

是不是Mutex锁定,因为它无法看到该函数实际上是从另一个线程调用的?我试图用Rust线程重新创建问题,但不能。我最好还是让async排水管工作。

下面是一个示例程序,它给出了有问题的行为。回调将vegas函数的最后一个参数作为其参数之一。这是记录器克隆的向量。这样,每个核心都应该拥有自己的记录器副本:

#[macro_use]
extern crate slog;
extern crate cuba;
extern crate slog_term;

use slog::Drain;

// this function is called from different c threads
// `core` indicates which thread
fn integrand(
    _x: &[f64],
    _f: &mut [f64],
    loggers: &mut Vec<slog::Logger>,
    _nvec: usize,
    core: i32,
) -> Result<(), &'static str> {
    info!(loggers[core as usize], "A\nB\nC");

    Ok(())
}

fn main() {
    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::CompactFormat::new(decorator).build();
    let drain = std::sync::Mutex::new(drain).fuse();

    let log = slog::Logger::root(drain, o!());

    let mut integrator = cuba::CubaIntegrator::new(integrand);
    integrator.set_cores(10, 1000); // set 10 cores

    integrator.vegas(
        1,
        1,
        cuba::CubaVerbosity::Progress,
        0,
        vec![log.clone(); 11],
    );
}

输出:

C 
INFO Mar 26A
B
C 10:27
:42.195 MarINFO 26  10:27A
B
C:42.195
 MarINFO 26  10:27A
B
C:42.195
 INFO A
B
C
Mar 26 10:27:42.196 INFO A
B
C
Mar 26 10:27:42.196 INFO A
B
C
multithreading rust mutex ffi
1个回答
8
投票

古巴C图书馆has this to say

Windows用户:Cuba 3及更高版本使用fork(2)来并行化执行线程。但是,这个POSIX函数不是Windows API的一部分,而且还以一种必不可少的方式使用,使得它无法简单地使用CreateProcess等进行处理。唯一可行的仿真似乎可以通过Cygwin获得。

这是代码的复制品。我们fork然后孩子和父母试图在打印东西时拿着互斥锁。插入sleep以鼓励OS调度程序尝试其他线程:

use nix::unistd::{fork, ForkResult}; // 0.13.0
use std::{sync::Mutex, thread, time::Duration};

fn main() {
    let shared = Mutex::new(10);

    match fork() {
        Ok(ForkResult::Parent { .. }) => {
            let max = shared.lock().unwrap();
            for _ in 0..*max {
                println!("Parent");
                thread::sleep(Duration::from_millis(10));
            }
        }
        Ok(ForkResult::Child) => {
            let max = shared.lock().unwrap();
            for _ in 0..*max {
                println!("Child");
                thread::sleep(Duration::from_millis(10));
            }
        }
        Err(e) => {
            eprintln!("Error: {}", e);
        }
    }
}
$ cargo run

Parent
Child
Parent
Child
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent

使用带有线程的fork真的很难处理;我清楚地记得以前找到与此相关的可怕问题。我发现深入的两个资源:

后者说(强调我的):

我可以在分叉之前创建互斥吗?

是 - 但是子进程和父进程不共享虚拟内存,并且每个进程都将具有独立于另一个的互斥锁。

(高级注释:使用共享内存的高级选项允许子级和父级共享互斥锁,如果它使用正确的选项创建并使用共享内存段。请参阅procs, fork(), and mutexes


如果我使用异步消耗,我根本就没有输出。

也可以看看:


我不相信古巴鲁斯特图书馆。有两个要点:

  1. 如果正在创建线程,则用户数据泛型类型应该绑定SyncSend,仅限制在线程之间共享/传输数据的安全类型。
  2. 传递给integrand函数的用户数据不应该是&mut。 Rust的一个基本概念是,任何时候都只能对任何数据进行单一的可变引用。古巴琐碎地允许你规避这一点。

以下是古巴Rust和C图书馆的复制尝试:

#[macro_use]
extern crate slog;

use slog::Drain;

fn integrand(loggers: &mut Vec<slog::Logger>, core: i32) {
    info!(loggers[core as usize], "A\nB\nC\n{}", core);
}

fn main() {
    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::CompactFormat::new(decorator).build();
    let drain = std::sync::Mutex::new(drain).fuse();

    let log = slog::Logger::root(drain, o!());

    let logs = vec![log.clone(); 11];

    cuba_repro(logs, integrand);
}

use std::{ffi::c_void, thread};

type Integrand<T> = fn(&mut T, i32);

fn cuba_repro<T>(mut user_data: T, mut integrand: Integrand<T>) {
    // From the `vegas` method
    let user_data_ptr = &mut user_data as *mut _ as *mut c_void;
    let integrand_ptr = &mut integrand as *mut _ as *mut c_void;

    unsafe { cuba_repro_ffi::<T>(user_data_ptr, integrand_ptr) }
}

unsafe fn cuba_repro_ffi<T>(user_data: *const c_void, integrand: *const c_void) {
    let user_data = FfiDoesNotCareAboutSendOrSync(user_data);
    let integrand = FfiDoesNotCareAboutSendOrSync(integrand);

    let threads: Vec<_> = (0..4).map(move |i| {
        thread::spawn(move || {
            // C doesn't care about this pedantry
            let user_data = &mut *(user_data.0 as *const T as *mut T);
            let integrand = &mut *(integrand.0 as *const Integrand<T> as *mut Integrand<T>);

            // From the `c_integrand` function
            let k: &mut T = &mut *(user_data as *mut _);
            let _ignored = integrand(k, i);
        })
    }).collect();

    for t in threads { t.join().unwrap() }
}

#[derive(Copy, Clone)]
struct FfiDoesNotCareAboutSendOrSync<T>(T);
unsafe impl<T> Send for FfiDoesNotCareAboutSendOrSync<T> {}
unsafe impl<T> Sync for FfiDoesNotCareAboutSendOrSync<T> {}

我不得不进行大量修改,以使Rust编译器忽略古巴图书馆和相关FFI正在执行的大量不安全和违规行为。

这个代码示例实际上按顺序打印出4个日志语句,因此这不是一个完整的答案。但是,我很确定古巴图书馆正在触发未定义的行为,这意味着任何结果都是可能的,包括显然有效。

© www.soinside.com 2019 - 2024. All rights reserved.