dataloader 无法将数据发送到多个 GPU

问题描述 投票:0回答:1

我正在使用 pytorch lighting 训练模型。使用单个 GPU 进行的训练效果很好,但是当我更改

devices=2
中的
strategy='ddp'
pl.Trainer
时,代码崩溃了,因为一个 GPU 无法获得任何批次。我不知道为什么以及如何解决这个问题。

数据模块代码:

from typing import Any, Dict
import numpy as np
from pathlib import Path

from torch.utils.data import DataLoader
from monai.data import Dataset
from monai.transforms import (
    ConcatItemsd,
    RandAffined,
    RandFlipd,
    Compose,
    LoadImaged,
    EnsureTyped,
    EnsureChannelFirstd,
    RandGaussianNoised,
    RandScaleIntensityd,
    RandStdShiftIntensityd,
)
import pytorch_lightning as pl

import codebase.terminology as term

_TRANSFORM_DICT = {'flip': {'p': 0.5, 'axes': (0, 1)},
                   # ration range has to consider whether the channel exist or not
                   # because the transform assues no channels
                   'rotate': {'radians': [0.5, 0.5, 0.0], 'p': 0.8},
                   'affine': {'p': 0.5, 'degrees': 0.5, 'translation': 0.3}}


class MedicalImageDataModule(pl.LightningDataModule):
    """Image Data Module"""
    def __init__(self, task_type: term.ProblemType,
                 config: Dict[str, Any],
                 transform_dict: Dict[str, Any] = _TRANSFORM_DICT):
        super().__init__()
        self.task_type = task_type
        self.configs = config
        self.task = self.configs['experiment']['name']
        self.spatial_size = self.configs['model']['spatial_size']
        self.train_batch_size = self.configs['train']['batch_size']
        self.valid_batch_size = self.configs['valid']['batch_size']
        self.test_batch_size = self.configs['test']['batch_size']
        self.train_num_workers = self.configs['train']['num_workers']
        self.valid_num_workers = self.configs['valid']['num_workers']
        self.include_test = self.configs['test']['include']
        self.base_dir = Path(self.configs['experiment']['data_path'])
        self.train_ids = []
        self.valid_ids = []
        self.test_ids = []
        self.transform_dict = transform_dict
        self.train_transform = None
        self.valid_transform = None
        self.train_set: Dataset
        self.val_set: Dataset
        self.test_set: Dataset

    def get_data_list(self):
        """Gets the lists of image ids for train and validation."""
        file_names = (self.base_dir / 'train' / 'images').glob('*__CT.nii.gz')
        train_ids = [file_name.stem.split('__')[0] for file_name in file_names]
        file_names = (self.base_dir / 'valid' / 'images').glob('*__CT.nii.gz')
        valid_ids = [file_name.stem.split('__')[0] for file_name in file_names]
        print(f'Locating data in {self.base_dir}: {len(train_ids)} for train'
              f' and {len(valid_ids)} for validation')
        return train_ids, valid_ids

    def get_test_data_list(self):
        """Gets the lists of image ids for test."""
        file_names = (self.base_dir / 'test' / 'images').glob('*__CT.nii.gz')
        test_ids = [file_name.stem.split('__')[0] for file_name in file_names]
        return test_ids

    def prepare_data(self):
        """Loads image ids."""
        self.train_ids, self.valid_ids = self.get_data_list()
        if self.include_test:
            self.test_ids = self.get_test_data_list()

    def setup(self, stage=None):
        """Sets up data."""
        self.train_transform, self.valid_transform = self.get_augmentation_transform(self.transform_dict)

        train_files = [{'CT': str(self.base_dir / 'train' / 'images' / (id + '__CT.nii.gz')),
                        'PT': str(self.base_dir / 'train' / 'images' / (id + '__PT.nii.gz')),
                        'label': str(self.base_dir / 'train' / 'labels' / (id + '.nii.gz'))
                        } for id in self.train_ids]
        self.train_set = Dataset(data=train_files, transform=self.train_transform)
        print(f'Size of train dataset: {len(self.train_set)}')

        valid_files = [{'CT': str(self.base_dir / 'valid' / 'images' / (id + '__CT.nii.gz')),
                        'PT': str(self.base_dir / 'valid' / 'images' / (id + '__PT.nii.gz')),
                        'label': str(self.base_dir / 'valid' / 'labels' / (id + '.nii.gz'))
                        } for id in self.valid_ids]
        self.val_set = Dataset(data=valid_files, transform=self.valid_transform)
        print(f'Size of validation dataset: {len(self.val_set)}')

        if self.include_test:
            self.test_ids = self.get_test_data_list()
            test_files = [{'CT': str(self.base_dir / 'test' / 'images' / (id + '__CT.nii.gz')),
                           'PT': str(self.base_dir / 'test' / 'images' / (id + '__PT.nii.gz')),
                           'label': str(self.base_dir / 'test' / 'labels' / (id + '.nii.gz'))
                           } for id in self.test_ids]
            self.test_set = Dataset(data=test_files, transform=self.valid_transform)
            print(f'Size of test dataset: {len(self.test_set)}')

    def get_augmentation_transform(self, transform_dict: Dict[str, Any]):
        """Gets augumentation transforms."""
        train_augmentation = Compose(
            [
                LoadImaged(keys=['CT', 'PT', 'label'], image_only=False),
                EnsureChannelFirstd(keys=['CT', 'PT', 'label']),
                RandGaussianNoised(keys=['CT']),
                RandStdShiftIntensityd(keys=['CT'], factors=0.2),
                RandScaleIntensityd(keys=['CT'], factors=0.1),
                RandFlipd(keys=['CT', 'PT', 'label'], prob=transform_dict['flip']['p'],
                          spatial_axis=transform_dict['flip']['axes']),
                EnsureTyped(keys=['CT', 'PT', 'label']),  # Note: label not in one-hot form
                # AsDiscreted(keys=['label'], to_onehot=self.configs['metric']['num_classes'])
                ConcatItemsd(keys=['CT', 'PT'], name="input", dim=0)
            ]
        )

        valid_augmentation = Compose(
            [
                LoadImaged(keys=['CT', 'PT', 'label'], image_only=False),
                EnsureChannelFirstd(keys=['CT', 'PT', 'label']),
                # AsDiscreted(keys=['label'], to_onehot=self.configs['metric']['num_classes'])
                ConcatItemsd(keys=['CT', 'PT'], name="input", dim=0)
            ]
        )
        return train_augmentation, valid_augmentation

    def train_dataloader(self):
        # p_dataset = PatchDataset(self.train_set, patch_func=lambda x: x,  # type: ignore
        #                          samples_per_image=self.configs['train']['samples_per_volume'])
        dataloader = DataLoader(self.train_set, batch_size=self.train_batch_size,
                                num_workers=self.train_num_workers, shuffle=True)
        print(f'Train dataloader length: {len(dataloader)}')
        if len(dataloader) == 0:
            raise ValueError('No train data batch available.')
        return dataloader

    def val_dataloader(self):
        dataloader = DataLoader(self.val_set, batch_size=self.valid_batch_size,
                                num_workers=self.valid_num_workers, shuffle=False)
        print(f'Validation dataloader length: {len(dataloader)}')
        if len(dataloader) == 0:
            raise ValueError('No validation data batch available.')
        return dataloader

    def test_dataloader(self):
        dataloader = DataLoader(self.test_set, batch_size=self.test_batch_size,
                                num_workers=self.valid_num_workers, shuffle=False)
        print(f'Test dataloader length: {len(dataloader)}')
        if len(dataloader) == 0:
            raise ValueError('No test data batch available.')
        return dataloader

培训师代码:

trainer = pl.Trainer(accelerator="gpu",
                         devices=FLAGS.num_devices,
                         # num_nodes=FLAGS.num_nodes,
                         logger=logger,
                         max_epochs=max_epochs,
                         check_val_every_n_epoch=1,
                         # precision=16,
                         num_sanity_val_steps=0,
                         enable_model_summary=True,
                         enable_progress_bar=True,
                         log_every_n_steps=config['train']['logging_frequency_steps'],
                         callbacks=callbacks,
                         strategy=FLAGS.strategy
                         )

错误信息:

2023-08-21 20:22:25.522017: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations. To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags. W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT 2023-08-21 20:22:27.490978: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1960] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Skipping registering GPU devices... Max epochs: 300 Use Dice CrossEntropy loss Starting learning rate: 0.0001 GPU available: True (cuda), used: True TPU available: False, using: 0 TPU cores IPU available: False, using: 0 IPUs HPU available: False, using: 0 HPUs Locating data in /workspace/data/processed_128x128: 1830 for train and 390 for validation Initializing distributed: GLOBAL_RANK: 0, MEMBER: 1/2 2023-08-21 20:22:33.722437: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations. To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags. 2023-08-21 20:22:34.528897: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT 2023-08-21 20:22:35.020353: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1960] Cannot dlopen some GPU libraries. Skipping registering GPU devices... Max epochs: 300 Use Dice CrossEntropy loss Starting learning rate: 0.0001 Initializing distributed: GLOBAL_RANK: 1, MEMBER: 2/2 I0821 20:22:36.928924 139652776452736 distributed_c10d.py:442] Added key: store_based_barrier_key:1 to store for rank: 1 I0821 20:22:36.938210 139831611515520 distributed_c10d.py:442] Added key: store_based_barrier_key:1 to store for rank: 0 I0821 20:22:36.938375 139831611515520 distributed_c10d.py:476] Rank 0: Completed store-based barrier for key:store_based_barrier_key:1 with 2 nodes.
---------------------------------------------------------------------------------------------------- distributed_backend=nccl All distributed processes registered. Starting with 2 processes
----------------------------------------------------------------------------------------------------

I0821 20:22:36.939174 139652776452736 distributed_c10d.py:476] Rank 1: Completed store-based barrier for key:store_based_barrier_key:1 with 2 nodes. You are using a CUDA device ('NVIDIA A100-SXM4-80GB') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance.
**Size of train dataset: 0 Size of validation dataset: 0 Size of train dataset: 1830 Size of validation dataset: 390** LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1,2,3] LOCAL_RANK: 1 - CUDA_VISIBLE_DEVICES: [0,1,2,3] ...

File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/sampler.py", line 107, in __init__
    raise ValueError("num_samples should be a positive integer " ValueError: num_samples should be a positive integer value, but got num_samples=0
deep-learning pytorch pytorch-lightning
1个回答
0
投票

发现问题所在,就在prepare_data()中。 在pytorch Lightning中,prepare_data仅在主进程中调用,我不应该在调用中设置状态,因为状态仅在单个进程中可用。我将这些行移至 setup() 并解决了问题。

© www.soinside.com 2019 - 2024. All rights reserved.