我使用 MPI 程序来并行化批处理。
成像多维 MRI 图像(3 个空间维度,线圈数据,...),我们将其中的几个聚集在一个批次维度上。
程序应在批次维度上应用相同的任务。
我的问题现在我有
nproc
进程(COMM_SIZE)但是我的批次维度不能被nproc
整除。
MPI_Scatter,或者更准确地说 MPI_Gather 会阻塞应用程序。
我的问题: 是否有比使用 MPI_Send/MPI_Recv 进行 for 循环更优雅/方便的方法来克服这个问题?
我找到了这个答案 但恕我直言,它解释了一个不同的问题,因为我无法使用 MPI_Scatterv 更改发送/接收数据的数量。请证明我错了:)
提前致谢!
在示例中,
mpirun -n 2 tut2
效果很好,但 mpirun -n 3 tut2
在 MPI_Gather 之前卡住了。
取自mpitutorial.com
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <mpi.h>
#include <assert.h>
// Creates an array of random numbers. Each number has a value from 0 - 1
double *create_rand_nums(int num_elements) {
double *rand_nums = (double *)malloc(sizeof(double) * num_elements);
assert(rand_nums != NULL);
int i;
for (i = 0; i < num_elements; i++) {
rand_nums[i] = (rand() / (double)RAND_MAX);
}
return rand_nums;
}
// Computes the average of an array of numbers
double compute_avg(double *array, int num_elements) {
double sum = 0.f;
int i;
for (i = 0; i < num_elements; i++) {
sum += array[i];
}
return sum / num_elements;
}
double compute_sum(double *array, int num_elements) {
double sum = 0.f;
int i;
for (i = 0; i < num_elements; i++) {
sum += array[i];
}
return sum;
}
int main(int argc, char** argv) {
MPI_Init(NULL, NULL);
if (argc != 2) {
fprintf(stderr, "Usage: avg num_elements_per_proc\n");
exit(1);
}
int num_elements_per_proc = atoi(argv[1]);
// Seed the random number generator to get different results each time
srand(time(NULL));
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
char processor_name[MPI_MAX_PROCESSOR_NAME]; // gets the name of the processor
int name_len;
MPI_Get_processor_name(processor_name, &name_len);
// Gather all partial averages down to the root process
double *sub_avgs = NULL;
if (world_rank == 0) {
sub_avgs = (double *)malloc(sizeof(double) * (world_size + 2));
assert(sub_avgs != NULL);
}
double *rand_nums = NULL;
if (world_rank == 0) {
rand_nums = create_rand_nums(num_elements_per_proc * (world_size + 2));
}
double original_data_avg = 0;
for(int i = world_rank; i < (world_size + 2); i += world_size) {
int off = 0;
if (world_rank == 0)
off = world_size;
MPI_Bcast(&off, 1, MPI_INT, 0, MPI_COMM_WORLD);
double *sub_rand_nums = (double *)malloc(sizeof(double) * num_elements_per_proc);
assert(sub_rand_nums != NULL);
MPI_Scatter(rand_nums, num_elements_per_proc, MPI_DOUBLE, sub_rand_nums,
num_elements_per_proc, MPI_DOUBLE, 0, MPI_COMM_WORLD);
// Compute the average of your subset
double sub_avg = compute_sum(sub_rand_nums, num_elements_per_proc);
printf("Sub avg %s (%d/%d): %f\n", processor_name, world_rank, world_size, sub_avg);
//!!! Here the block appears !!!
// Gather all partial averages down to the root process
MPI_Gather(&sub_avg, 1, MPI_DOUBLE, sub_avgs, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);
if (world_rank == 0) {
double avg = compute_sum(sub_avgs, world_size)/(num_elements_per_proc * world_size);
printf("Avg of all elements is %f\n", avg);
// Compute the average across the original data for comparison
double original_data_avg +=
compute_avg(rand_nums + off, num_elements_per_proc * world_size);
}
free(sub_rand_nums);
}
// Clean up
if (world_rank == 0) {
printf("Avg computed across original data is %f\n", original_data_avg);
free(rand_nums);
free(sub_avgs);
}
printf("proc %s (%d/%d) signes off\n", processor_name, world_rank, world_size);
MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
}