我正在尝试构建一个 MPI 程序,该程序从数量减少的处理器发送两种类型的消息,以使用非阻塞发送来处理 A(该程序是随机的,我使用标签来区分消息属于哪种类型。第一个类型的消息用于发送数据向量,第二种类型用于告诉进程不再发送消息,代码如下所示:
void send_data() {
/*... initialization of variables ...*/
MPI_Isend( loc_buffer1.data(), //Address of the message we are sending.
bufferlen, //Number of elements handled by that address.
MPI_INT, //MPI_TYPE of the message we are sending.
new_proc, //Rank of receiving process
1, //Message Tag
MPI_COMM_WORLD, //MPI Communicator
&request1 ); }
}
void check_for_criteria() {
if (criteria_met) { send_data(); }
// Telling the target process there is no more data to be sent
MPI_Isend( NULL, //Address of the message we are sending.
0, //Number of elements handled by that address.
MPI_CHAR, //MPI_TYPE of the message we are sending.
new_proc, //Rank of receiving process
2, //Message Tag
MPI_COMM_WORLD, //MPI Communicator
&request1 ); }
}
void receive_parallel_comm_helper(...) {
int test_flag1 = 0;
int test_flag2 = 0;
MPI_Status status1;
MPI_Status status2;
std::vector<int> new_loc_buffer1(9);
std::vector<bool> stop_par_Array(num_procs, 0);
stop_par_Array[rank] = 1;
bool stop_par = 0;
while ((!stop_par)) {
for (int i=0; i<num_procs; i++) {
if (i != rank) {
stop_par = 1;
status1.MPI_TAG = 0;
status2.MPI_TAG = 0;
test_flag1 = 0;
test_flag2 = 0;
MPI_Iprobe(i, 1, MPI_COMM_WORLD, &test_flag1, &status1);
if ((status1.MPI_TAG == 1) (test_flag1)) {
MPI_Recv(new_loc_buffer1.data(), 9, MPI_INT, i, 1, MPI_COMM_WORLD, &status1);
}
test_flag2 = 0;
MPI_Iprobe(MPI_ANY_SOURCE, 2, MPI_COMM_WORLD, &test_flag2, &status2);
if ((status2.MPI_TAG == 2) && (test_flag2)) {
MPI_Recv(NULL, 0, MPI_CHAR, status2.MPI_SOURCE, par_done_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
stop_par_Array[status2.MPI_SOURCE] = 1;
}
for (int j = 0; j < num_procs; j++) {
stop_par &= stop_par_Array[j];
}
}
}
}
MPI_Barrier(MPI_COMM_WORLD);
}
void main_function() {
while (some_criteria) {
...
check_for_criteria();
MPI_Barrier(MPI_COMM_WORLD);
receive_parallel_comm_helper();
...
}
}
本质上,当调用 main_function() 时,每个进程都会检查一些标准,以确定是否应该将数据发送到另一个进程。无论是否发送数据,都会发送一个空消息来表示发送过程完成。然后,每个进程进入 receive_parallel_comm_helper() 函数,一个 while 循环在等待接收时旋转: 1. 数据和 2. 指示从该处理器发送的数据已完成的空消息。但是,我目前没有获得所需的行为 - 数据经常被发送,但在 main_function() while 循环的后续迭代中被接收,因为由于某种原因首先收到空消息。我知道消息顺序是由 MPI 保证的,但此代码依赖于这样的假设:对于具有不同标签的消息,消息发送是顺序的 - 情况不是这样吗?如果不是,如何更正此代码?
即使 MPI 确实维护不同标签的顺序排队,您也会遇到竞争条件,因为您确实对类型 1 和 2 进行了 separate
Iprobe
调用:
|时间 T |发件人 |接收器 |
| - | - | - |
| 1 | | Iprobe 1 - 否
| 2 |发送 1 | |
| 3 |发送 2 | |
| 4 | | Iprobe 2 - 是的
| 5 | |接收 2 |
| 6 | | Iprobe 1 - 是
| 7 | |接收 1 |
一种可能的修复方法是在
MPI_ANY_TAG
中使用 Iprobe
而不是 1 或 2。
这将解决问题如果 MPI 确实维持不同标签的顺序排队。
但是,我对 MPI 不太了解,不知道它是否进行顺序排队。您可以通过修改代码以仅执行一个
Iprobe
和前面提到的 MPI_ANY_TAG
来测试这一点。
另一种方法是创建一个通用的消息头结构:
#define MAX_SIZE 2048
struct mymsg {
int msg_type; // message type
unsigned int msg_seqno; // sequence number
unsigned int msg_count; // number of elements
union {
unsigned int msg_1[MAX_SIZE]; // type 1 data
unsigned float msg_2[MAX_SIZE]; // type 2 data
};
};
然后,始终使用 single 标签值(例如 1)发送,并根据结构中的
msg_type
字段解码数据。
有了这样的消息结构,您还可以使用不同的 MPI 标签,但将消息保存在本地队列中如果您得到的序列号不连续。
也就是说,如果您得到如下序列:
1, 4, 2, 5, 3
:
粗略地说,这就是网络软件如何重新组装被分割且各段到达无序的 TCP 消息的方式。