在炬管中,如何在不同GPU上正确平均梯度?

问题描述 投票:1回答:1

在torch.distributed中,如何在不同GPU上正确平均梯度?

https://github.com/seba-1511/dist_tuto.pth/blob/gh-pages/train_dist.py修改而来,下面的代码可以成功使用两个GPU(可以通过nvidia-smi检查)。

但是很难理解的是,下面的“ average_gradients”是否确实是平均两个GPU上两个模型上的梯度的正确方法。像下面的代码一样,通过两个进程运行的两个'model = Net()'是两个不同GPU上的两个模型,但是'average_gradients(model)'行只是将一个GPU上的模型的梯度平均,而不是两个两个GPU上的模型。

问题是,下面的代码确实是在两个GPU上平均梯度的正确方法吗?如果为true,如何阅读,如何理解代码?如果不是,在以下两个模型上平均梯度的正确方法是什么?

导入操作系统进口火炬导入torch.distributed为dist将torch.nn导入为nn导入功能为F的torch.nn。导入torch.optim作为优化从数学导入单元格从随机导入从torch.multiprocessing导入过程从torchvision导入数据集,进行变换os.environ [“ CUDA_VISIBLE_DEVICES”] =“ 0,1”分区(对象)类:“”“类似数据集的对象,但仅访问它的子集。”“”def __init __(自身,数据,索引):self.data =数据self.index =索引def __len __(自己):返回len(self.index)def __getitem __(self,index):data_idx = self.index [index]返回self.data [data_idx]DataPartitioner(object)类:“”“将数据集划分为不同的块。”“”def __init __(自身,数据,大小= [0.7、0.2、0.1],种子= 1234):self.data =数据self.partitions = []rng = Random()rng.seed(种子)data_len = len(数据)索引= [x代表x在范围(0,data_len)中]rng.shuffle(索引)对于压裂尺寸:part_len = int(分数* data_len)self.partitions.append(indexes [0:part_len])索引=索引[part_len:]def use(自我,分区):返回分区(self.data,self.partitions [partition])Net(nn.Module)类:“”“网络体系结构。”“”def __init __():超级(净值,自我).__ init __()self.conv1 = nn.Conv2d(1,10,kernel_size = 5)self.conv2 = nn.Conv2d(10,20,kernel_size = 5)self.conv2_drop = nn.Dropout2d()self.fc1 = nn.Linear(320,50)self.fc2 = nn.Linear(50,10)def forward(self,x):x = F.relu(F.max_pool2d(self.conv1(x),2))x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)),2))x = x.view(-1,320)x = F.relu(self.fc1(x))x = F.dropout(x,training = self.training)x = self.fc2(x)返回F.log_softmax(x)def partition_dataset():“”“分区MNIST”“”数据集= datasets.MNIST('。/数据',火车=真,下载=正确,transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize(((0.1307,),(0.3081,))]))大小= dist.get_world_size()bsz = int(256 / float(size))partition_sizes = [1.0 / _的大小(范围(大小))]partition = DataPartitioner(数据集,partition_sizes)分区= partition.use(dist.get_rank())train_set = torch.utils.data.DataLoader(分区,batch_size = bsz,shuffle = True)返回train_set,bszdef average_gradients(model):“”“梯度平均。”“”大小=浮点数(dist.get_world_size())对于model.parameters()中的参数:dist.all_reduce(param.grad.data,op = dist.reduce_op.SUM)param.grad.data / =大小def运行(等级,大小):“”“分布式同步SGD示例”“”#print(“ 107 size =”,size)#print(“ dist.get_world_size()=”,dist.get_world_size())## 2torch.manual_seed(1234)train_set,bsz = partition_dataset()device = torch.device(“ cuda:{}”。format(rank))型号= Net()型号= model.to(设备)优化程序= optim.SGD(model.parameters(),lr = 0.01,动量= 0.5)num_batches = ceil(len(train_set.dataset)/ float(bsz))对于范围(10)中的时代:epoch_loss = 0.0要获取数据,请在train_set中定位:#数据,目标=变量(数据),变量(目标)#data,target = Variable(data.cuda(rank)),Variable(target.cuda(rank))数据,目标= data.to(设备),target.to(设备)Optimizer.zero_grad()输出=模型(数据)损失= F.nll_loss(输出,目标)epoch_loss + = loss.item()loss.backward()average_gradients(模型)Optimizer.step()print('Rank',dist.get_rank(),',epoch',epoch,':',epoch_loss / num_batches)#如果纪元== 4:#从utils导入module_utils#module_utils.save_model()def init_processes(等级,大小,fn,后端='gloo'):“”“初始化分布式环境。”“”os.environ ['MASTER_ADDR'] ='127.0.0.1'os.environ ['MASTER_PORT'] ='29500'dist.init_process_group(后端,等级=等级,世界尺寸=大小)fn(等级,大小)如果__name__ ==“ __main__”:大小= 2流程= []对于范围(大小)中的排名:p =进程(target = init_processes,args =(等级,大小,运行))p.start()processs.append(p)对于过程中的p:p.join()
python pytorch torch multi-gpu
1个回答
0
投票

我的解决方案是使用DistributedDataParallel而不是DataParallel,如下所示。

代码

for param in self.model.parameters():
    torch.distributed.all_reduce(param.grad.data)

可以成功工作。

class DDPOptimizer:
    def __init__(self, model, torch_optim=None, learning_rate=None):
        """
        :param parameters:
        :param torch_optim: like torch.optim.Adam(parameters, lr=learning_rate, eps=1e-9)
            or optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
        :param is_ddp:
        """
        if torch_optim is None:
            torch_optim = torch.optim.Adam(model.parameters(), lr=3e-4, eps=1e-9)

        if learning_rate is not None:
            torch_optim.defaults["lr"] = learning_rate

        self.model = model
        self.optimizer = torch_optim

    def optimize(self, loss):
        self.optimizer.zero_grad()
        loss.backward()
        for param in self.model.parameters():
            torch.distributed.all_reduce(param.grad.data)

        self.optimizer.step()
    pass

def run():
    """ Distributed Synchronous SGD Example """

    module_utils.initialize_torch_distributed()
    start = time.time()

    train_set, bsz = partition_dataset()
    model = Net()

    local_rank = torch.distributed.get_rank()
    device = torch.device("cuda", local_rank)
    model = model.to(device)

    sgd = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
    optimizer = DDPOptimizer(model, torch_optim=sgd)

    # optimizer = NoamOptimizerDistributed(100, 1, 10, model)

    num_batches = math.ceil(len(train_set.dataset) / float(bsz))

    epoch, end_epoch = 1, 10

    while epoch <= end_epoch:
        epoch_loss = 0.0
        for data, target in train_set:
            data, target = data.to(device), target.to(device)

            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            optimizer.optimize(loss)

        print('Rank ', dist.get_rank(), ', epoch ', epoch, ': ', epoch_loss / num_batches)
        # if epoch % 6 == 0:
        #     if local_rank == 0:
        #         module_utils.save_model(model, "a.pt")
        epoch += 1

    print("Time take to train: ", time.time() - start)
© www.soinside.com 2019 - 2024. All rights reserved.