我正在学习 MPI4Py,我想实现一个简单的程序。
解释
在这里,每个等级都有一个大小为
send_array
且值分别等于 rank+1
的 rank+1
。
rank0 = [1]
rank1 = [2 2]
rank2 = [3 3 3]
rank3 = [4 4 4 4]
我想将值收集到
rank=0
到缓冲区rbuf
。它的大小等于所有局部send_arrays
的总大小,即1+2+3+4 = 10
。
节目
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
send_array = np.ones(rank+1).astype(int) * (rank + 1)
print(rank, send_array)
if rank == 0:
gather_size = np.array([sum([i+1 for i in range(size)])])
print(gather_size)
rbuf = np.zeros(gather_size[0]).astype(int)
else:
gather_size = None
rbuf = None
# comm.Gatherv(sendbuf, recvbuf=(recvbuf, recvcounts, displs, datatype), root=0)
comm.Gatherv(sendbuf=send_array, recvbuf=(rbuf, (1,2,3,4),(0,1,3,6), MPI.INT), root=0)
if rank == 0:
print(rbuf, len(rbuf))
观察/错误/怀疑
我手动对值进行了硬编码,使其更加明确,因为我对
recvcounts
和 displs
的值有疑问。我收到以下错误:
comm.Gatherv(sendbuf=send_array, recvbuf=(rbuf, (1,2,3,4),(0,1,3,6), MPI.INT), root=0)
File "mpi4py/MPI/Comm.pyx", line 724, in mpi4py.MPI.Comm.Gatherv
mpi4py.MPI.Exception: MPI_ERR_TRUNCATE: message truncated
我期待的结果是:
[1 2 2 3 3 3 4 4 4 4]
玩了一下之后,我还意识到,如果我将 rbuf 的大小保持等于 [maximum_local_array_size*number_of_ranks
],则代码
可以工作,在这种情况下可能是
4*4
,因为我有 4 个等级。
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# send_array = np.ones(10).astype(int) * (rank + 1)
send_array = np.ones(rank+1).astype(int) * (rank + 1)
print(rank, send_array)
if rank == 0:
gather_size = np.array([16])
print(gather_size)
rbuf = np.zeros(gather_size[0]).astype(int)
else:
gather_size = None
rbuf = None
# comm.Gatherv(sendbuf, recvbuf=(recvbuf, recvcounts, displs, datatype), root=0)
comm.Gatherv(sendbuf=send_array, recvbuf=(rbuf, MPI.INT), root=0)
if rank == 0:
print(rbuf, len(rbuf))
我得到的结果不是我想要的,但我没有收到任何错误。请注意,这次我也没有指定
recvcounts
和 displs
。所以我不确定这些参数默认使用的确切值是什么,因为这些似乎有效。
[1 0 0 0 2 2 0 0 3 3 3 0 4 4 4 4]
这里是如何手动指定接收计数和位移的示例
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
sbuf = np.ones(rank+1).astype(int) * (rank + 1)
def triangular(n):
return n * (n+1) // 2
if rank == 0:
gather_size = triangular(size)
rcounts = np.arange(1, size+1)
rdispls = np.array([triangular(i) for i in range(size)])
rbuf = np.zeros(gather_size, dtype=int)
else:
gather_size = None
rbuf = None
rcounts = None
rdispls = None
comm.Gatherv(sendbuf=sbuf, recvbuf=(rbuf, (rcounts, rdispls)), root=0)
if rank == 0:
print(rbuf, len(rbuf))