我正在尝试跨线程分发数组并让线程并行地对数组的部分进行求和。我希望线程0对元素0 1 2和线程1求和元素3 4 5进行求和。将线程2加到6和7,将线程3加到8和9之和。
我是Rust的新手,但之前用C / C ++ / Java编写过代码。我真的把所有东西都扔到了这个程序的垃圾槽里,我希望能得到一些指导。
对不起我的代码很邋but但是当它是成品时我会把它清理干净。请忽略所有命名不佳的变量/不一致的间距/等。
use std::io;
use std::rand;
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread::Thread;
static NTHREADS: usize = 4;
static NPROCS: usize = 10;
fn main() {
let mut a = [0; 10]; // a: [i32; 10]
let mut endpoint = a.len() / NTHREADS;
let mut remElements = a.len() % NTHREADS;
for x in 0..a.len() {
let secret_number = (rand::random::<i32>() % 100) + 1;
a[x] = secret_number;
println!("{}", a[x]);
}
let mut b = a;
let mut x = 0;
check_sum(&mut a);
// serial_sum(&mut b);
// Channels have two endpoints: the `Sender<T>` and the `Receiver<T>`,
// where `T` is the type of the message to be transferred
// (type annotation is superfluous)
let (tx, rx): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let mut scale: usize = 0;
for id in 0..NTHREADS {
// The sender endpoint can be copied
let thread_tx = tx.clone();
// Each thread will send its id via the channel
Thread::spawn(move || {
// The thread takes ownership over `thread_tx`
// Each thread queues a message in the channel
let numTougherThreads: usize = NPROCS % NTHREADS;
let numTasksPerThread: usize = NPROCS / NTHREADS;
let mut lsum = 0;
if id < numTougherThreads {
let mut q = numTasksPerThread+1;
lsum = 0;
while q > 0 {
lsum = lsum + a[scale];
scale+=1;
q = q-1;
}
println!("Less than numToughThreads lsum: {}", lsum);
}
if id >= numTougherThreads {
let mut z = numTasksPerThread;
lsum = 0;
while z > 0 {
lsum = lsum + a[scale];
scale +=1;
z = z-1;
}
println!("Greater than numToughthreads lsum: {}", lsum);
}
// Sending is a non-blocking operation, the thread will continue
// immediately after sending its message
println!("thread {} finished", id);
thread_tx.send(lsum).unwrap();
});
}
// Here, all the messages are collected
let mut globalSum = 0;
let mut ids = Vec::with_capacity(NTHREADS);
for _ in 0..NTHREADS {
// The `recv` method picks a message from the channel
// `recv` will block the current thread if there no messages available
ids.push(rx.recv());
}
println!("Global Sum: {}", globalSum);
// Show the order in which the messages were sent
println!("ids: {:?}", ids);
}
fn check_sum (arr: &mut [i32]) {
let mut sum = 0;
let mut i = 0;
let mut size = arr.len();
loop {
sum += arr[i];
i+=1;
if i == size { break; }
}
println!("CheckSum is {}", sum);
}
到目前为止,我已经做到了这么多。无法弄清楚为什么线程0和1具有相同的总和以及2和3做同样的事情:
-5
-49
-32
99
45
-65
-64
-29
-56
65
CheckSum is -91
Greater than numTough lsum: -54
thread 2 finished
Less than numTough lsum: -86
thread 1 finished
Less than numTough lsum: -86
thread 0 finished
Greater than numTough lsum: -54
thread 3 finished
Global Sum: 0
ids: [Ok(-86), Ok(-86), Ok(-54), Ok(-54)]
我设法使用下面的代码重写它以使用偶数。
while q > 0 {
if id*s+scale == a.len() { break; }
lsum = lsum + a[id*s+scale];
scale +=1;
q = q-1;
}
println!("Less than numToughThreads lsum: {}", lsum);
}
if id >= numTougherThreads {
let mut z = numTasksPerThread;
lsum = 0;
let mut scale = 0;
while z > 0 {
if id*numTasksPerThread+scale == a.len() { break; }
lsum = lsum + a[id*numTasksPerThread+scale];
scale = scale + 1;
z = z-1;
}
欢迎来到Rust! :)
是的,起初我没有意识到每个线程都有自己的规模副本
不仅!它也有自己的a
副本!
您要做的事情可能类似于以下代码。我想你可以更容易地看到一个完整的工作示例,因为你似乎是一个Rust初学者并且需要指导。我故意用[i32; 10]
取代Vec
,因为Vec
不是隐含的Copy
able。它需要一个明确的clone()
;我们不能意外复制它。请注意所有较大和较小的差异。代码也有一些功能(更少mut
)。我评论了大部分值得注意的事情:
extern crate rand;
use std::sync::Arc;
use std::sync::mpsc;
use std::thread;
const NTHREADS: usize = 4; // I replaced `static` by `const`
// gets used for *all* the summing :)
fn sum<I: Iterator<Item=i32>>(iter: I) -> i32 {
let mut s = 0;
for x in iter {
s += x;
}
s
}
fn main() {
// We don't want to clone the whole vector into every closure.
// So we wrap it in an `Arc`. This allows sharing it.
// I also got rid of `mut` here by moving the computations into
// the initialization.
let a: Arc<Vec<_>> =
Arc::new(
(0..10)
.map(|_| {
(rand::random::<i32>() % 100) + 1
})
.collect()
);
let (tx, rx) = mpsc::channel(); // types will be inferred
{ // local scope, we don't need the following variables outside
let num_tasks_per_thread = a.len() / NTHREADS; // same here
let num_tougher_threads = a.len() % NTHREADS; // same here
let mut offset = 0;
for id in 0..NTHREADS {
let chunksize =
if id < num_tougher_threads {
num_tasks_per_thread + 1
} else {
num_tasks_per_thread
};
let my_a = a.clone(); // refers to the *same* `Vec`
let my_tx = tx.clone();
thread::spawn(move || {
let end = offset + chunksize;
let partial_sum =
sum( (&my_a[offset..end]).iter().cloned() );
my_tx.send(partial_sum).unwrap();
});
offset += chunksize;
}
}
// We can close this Sender
drop(tx);
// Iterator magic! Yay! global_sum does not need to be mutable
let global_sum = sum(rx.iter());
println!("global sum via threads : {}", global_sum);
println!("global sum single-threaded: {}", sum(a.iter().cloned()));
}
使用像crossbeam这样的箱子你可以写下这段代码:
use rand::distributions::{Distribution, Uniform}; // 0.6.5
use crossbeam; // 0.7.1
const NTHREADS: usize = 4;
fn random_vec(length: usize) -> Vec<i32> {
let step = Uniform::new_inclusive(1, 100);
let mut rng = rand::thread_rng();
step.sample_iter(&mut rng).take(length).collect()
}
fn main() {
let numbers = random_vec(10);
let num_tasks_per_thread = numbers.len() / NTHREADS;
crossbeam::scope(|scope| {
// The `collect` is important to eagerly start the threads!
let threads: Vec<_> = numbers
.chunks(num_tasks_per_thread)
.map(|chunk| scope.spawn(move |_| chunk.iter().cloned().sum::<i32>()))
.collect();
let thread_sum: i32 = threads.into_iter().map(|t| t.join().unwrap()).sum();
let no_thread_sum: i32 = numbers.iter().cloned().sum();
println!("global sum via threads : {}", thread_sum);
println!("global sum single-threaded: {}", no_thread_sum);
}).unwrap();
}
Scoped线程允许您传入一个保证比线程更长的引用。然后你可以直接使用线程的返回值,跳过通道(这很好,这里不需要!)。
我跟着How can I generate a random number within a range in Rust?生成随机数。我也将它改为[1,100]范围,因为我认为这就是你的意思。但是,您的原始代码实际上是[-98,100],您也可以这样做。
Iterator::sum
用于总结数字的迭代器。
我投入了一些粗略的性能数字的线程工作,忽略了矢量结构,使用Rust 1.34并在发布模式下编译100,000,000个数字:
| threads | time (ns) | relative time (%) |
|---------+-----------+-------------------|
| 1 | 33824667 | 100.00 |
| 2 | 16246549 | 48.03 |
| 3 | 16709280 | 49.40 |
| 4 | 14263326 | 42.17 |
| 5 | 14977901 | 44.28 |
| 6 | 12974001 | 38.36 |
| 7 | 13321743 | 39.38 |
| 8 | 13370793 | 39.53 |
也可以看看: