如何在分区数组上运行并行计算线程?

问题描述 投票:4回答:3

我正在尝试跨线程分发数组并让线程并行地对数组的部分进行求和。我希望线程0对元素0 1 2和线程1求和元素3 4 5进行求和。将线程2加到6和7,将线程3加到8和9之和。

我是Rust的新手,但之前用C / C ++ / Java编写过代码。我真的把所有东西都扔到了这个程序的垃圾槽里,我希望能得到一些指导。

对不起我的代码很邋but但是当它是成品时我会把它清理干净。请忽略所有命名不佳的变量/不一致的间距/等。

use std::io;
use std::rand;
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread::Thread;

static NTHREADS: usize = 4;
static NPROCS: usize = 10;

fn main() {
    let mut a = [0; 10]; // a: [i32; 10]
    let mut endpoint = a.len() / NTHREADS;
    let mut remElements = a.len() % NTHREADS;

    for x in 0..a.len() {
        let secret_number = (rand::random::<i32>() % 100) + 1;
        a[x] = secret_number;
        println!("{}", a[x]);
    }
    let mut b = a;
    let mut x = 0;

    check_sum(&mut a);
    // serial_sum(&mut b);

    // Channels have two endpoints: the `Sender<T>` and the `Receiver<T>`,
    // where `T` is the type of the message to be transferred
    // (type annotation is superfluous)
    let (tx, rx): (Sender<i32>, Receiver<i32>) = mpsc::channel();
    let mut scale: usize = 0;

    for id in 0..NTHREADS {
        // The sender endpoint can be copied
        let thread_tx = tx.clone();
        // Each thread will send its id via the channel

        Thread::spawn(move || {
            // The thread takes ownership over `thread_tx`
            // Each thread queues a message in the channel
            let numTougherThreads: usize = NPROCS % NTHREADS;
            let numTasksPerThread: usize = NPROCS / NTHREADS;
            let mut lsum = 0;

            if id < numTougherThreads {
                let mut q = numTasksPerThread+1;
                lsum = 0;

                while q > 0 {
                    lsum = lsum + a[scale];
                    scale+=1;
                    q = q-1;
                }
                println!("Less than numToughThreads lsum: {}", lsum);
            }
            if id >= numTougherThreads {
                let mut z = numTasksPerThread;
                lsum = 0;

                while z > 0 {
                    lsum = lsum + a[scale];
                    scale +=1;
                    z = z-1;
                }    
                println!("Greater than numToughthreads lsum: {}", lsum);
            }
            // Sending is a non-blocking operation, the thread will continue
            // immediately after sending its message
            println!("thread {} finished", id);
            thread_tx.send(lsum).unwrap();
        });
    }

    // Here, all the messages are collected
    let mut globalSum = 0;
    let mut ids = Vec::with_capacity(NTHREADS);
    for _ in 0..NTHREADS {
        // The `recv` method picks a message from the channel
        // `recv` will block the current thread if there no messages      available
        ids.push(rx.recv());
    }
    println!("Global Sum: {}", globalSum);
    // Show the order in which the messages were sent

    println!("ids: {:?}", ids);
}

fn check_sum (arr: &mut [i32]) {
    let mut sum = 0;
    let mut i = 0;
    let mut size = arr.len();
    loop {
        sum += arr[i];
        i+=1;
        if i == size { break; }
    }
    println!("CheckSum is {}", sum);
}

到目前为止,我已经做到了这么多。无法弄清楚为什么线程0和1具有相同的总和以及2和3做同样的事情:

 -5
 -49
 -32
 99
 45
 -65
 -64
 -29
 -56
 65
 CheckSum is -91
 Greater than numTough lsum: -54
 thread 2 finished
 Less than numTough lsum: -86
 thread 1 finished
 Less than numTough lsum: -86
 thread 0 finished
 Greater than numTough lsum: -54
 thread 3 finished
 Global Sum: 0
 ids: [Ok(-86), Ok(-86), Ok(-54), Ok(-54)]

我设法使用下面的代码重写它以使用偶数。

    while q > 0 {
        if id*s+scale == a.len() { break; }
        lsum = lsum + a[id*s+scale];
        scale +=1;
        q = q-1;
    }
    println!("Less than numToughThreads lsum: {}", lsum);
}
if id >= numTougherThreads {
    let mut z = numTasksPerThread;
    lsum = 0;
    let mut scale = 0;

    while z > 0 {
        if id*numTasksPerThread+scale == a.len() { break; }
        lsum = lsum + a[id*numTasksPerThread+scale];
        scale = scale + 1;
        z = z-1;
    }
multithreading random synchronization rust
3个回答
8
投票

欢迎来到Rust! :)

是的,起初我没有意识到每个线程都有自己的规模副本

不仅!它也有自己的a副本!

您要做的事情可能类似于以下代码。我想你可以更容易地看到一个完整的工作示例,因为你似乎是一个Rust初学者并且需要指导。我故意用[i32; 10]取代Vec,因为Vec不是隐含的Copyable。它需要一个明确的clone();我们不能意外复制它。请注意所有较大和较小的差异。代码也有一些功能(更少mut)。我评论了大部分值得注意的事情:

extern crate rand;

use std::sync::Arc;
use std::sync::mpsc;
use std::thread;

const NTHREADS: usize = 4; // I replaced `static` by `const`

// gets used for *all* the summing :)
fn sum<I: Iterator<Item=i32>>(iter: I) -> i32 {
    let mut s = 0;
    for x in iter {
        s += x;
    }
    s
}

fn main() {
    // We don't want to clone the whole vector into every closure.
    // So we wrap it in an `Arc`. This allows sharing it.
    // I also got rid of `mut` here by moving the computations into
    // the initialization.
    let a: Arc<Vec<_>> =
        Arc::new(
            (0..10)
                .map(|_| {
                    (rand::random::<i32>() % 100) + 1
                })
                .collect()
        );

    let (tx, rx) = mpsc::channel(); // types will be inferred

    { // local scope, we don't need the following variables outside
        let num_tasks_per_thread = a.len() / NTHREADS; // same here
        let num_tougher_threads = a.len() % NTHREADS; // same here
        let mut offset = 0;
        for id in 0..NTHREADS {
            let chunksize =
                if id < num_tougher_threads {
                    num_tasks_per_thread + 1
                } else {
                    num_tasks_per_thread
                };
            let my_a = a.clone();  // refers to the *same* `Vec`
            let my_tx = tx.clone();
            thread::spawn(move || {
                let end = offset + chunksize;
                let partial_sum =
                    sum( (&my_a[offset..end]).iter().cloned() );
                my_tx.send(partial_sum).unwrap();
            });
            offset += chunksize;
        }
    }

    // We can close this Sender
    drop(tx);

    // Iterator magic! Yay! global_sum does not need to be mutable
    let global_sum = sum(rx.iter());
    println!("global sum via threads    : {}", global_sum);
    println!("global sum single-threaded: {}", sum(a.iter().cloned()));
}

6
投票

使用像crossbeam这样的箱子你可以写下这段代码:

use rand::distributions::{Distribution, Uniform}; // 0.6.5
use crossbeam; // 0.7.1

const NTHREADS: usize = 4;

fn random_vec(length: usize) -> Vec<i32> {
    let step = Uniform::new_inclusive(1, 100);
    let mut rng = rand::thread_rng();
    step.sample_iter(&mut rng).take(length).collect()
}

fn main() {
    let numbers = random_vec(10);
    let num_tasks_per_thread = numbers.len() / NTHREADS;

    crossbeam::scope(|scope| {
        // The `collect` is important to eagerly start the threads!
        let threads: Vec<_> = numbers
            .chunks(num_tasks_per_thread)
            .map(|chunk| scope.spawn(move |_| chunk.iter().cloned().sum::<i32>()))
            .collect();

        let thread_sum: i32 = threads.into_iter().map(|t| t.join().unwrap()).sum();
        let no_thread_sum: i32 = numbers.iter().cloned().sum();

        println!("global sum via threads    : {}", thread_sum);
        println!("global sum single-threaded: {}", no_thread_sum);
    }).unwrap();
}

Scoped线程允许您传入一个保证比线程更长的引用。然后你可以直接使用线程的返回值,跳过通道(这很好,这里不需要!)。

我跟着How can I generate a random number within a range in Rust?生成随机数。我也将它改为[1,100]范围,因为我认为这就是你的意思。但是,您的原始代码实际上是[-98,100],您也可以这样做。

Iterator::sum用于总结数字的迭代器。

我投入了一些粗略的性能数字的线程工作,忽略了矢量结构,使用Rust 1.34并在发布模式下编译100,000,000个数字:

| threads | time (ns) | relative time (%) |
|---------+-----------+-------------------|
|       1 |  33824667 |            100.00 |
|       2 |  16246549 |             48.03 |
|       3 |  16709280 |             49.40 |
|       4 |  14263326 |             42.17 |
|       5 |  14977901 |             44.28 |
|       6 |  12974001 |             38.36 |
|       7 |  13321743 |             39.38 |
|       8 |  13370793 |             39.53 |

也可以看看:


3
投票

您的所有任务都会获得scale变量的副本。线程1和2都做同样的事情,因为每个都有scale,其值为0,并以与另一个线程相同的方式修改它。线程3和4也是如此。

Rust可以防止您破坏线程安全。如果线程共享scale,则在访问变量时会遇到竞争条件。

请阅读closures,他们解释变量复制部分,以及threading,它解释了何时以及如何在线程之间共享变量。

© www.soinside.com 2019 - 2024. All rights reserved.