我正在使用 pytorch lighting 训练模型。使用单个 GPU 进行的训练效果很好,但是当我更改
devices=2
中的 strategy='ddp'
和 pl.Trainer
时,代码崩溃了,因为一个 GPU 无法获得任何批次。我不知道为什么以及如何解决这个问题。
数据模块代码:
from typing import Any, Dict
import numpy as np
from pathlib import Path
from torch.utils.data import DataLoader
from monai.data import Dataset
from monai.transforms import (
ConcatItemsd,
RandAffined,
RandFlipd,
Compose,
LoadImaged,
EnsureTyped,
EnsureChannelFirstd,
RandGaussianNoised,
RandScaleIntensityd,
RandStdShiftIntensityd,
)
import pytorch_lightning as pl
import codebase.terminology as term
_TRANSFORM_DICT = {'flip': {'p': 0.5, 'axes': (0, 1)},
# ration range has to consider whether the channel exist or not
# because the transform assues no channels
'rotate': {'radians': [0.5, 0.5, 0.0], 'p': 0.8},
'affine': {'p': 0.5, 'degrees': 0.5, 'translation': 0.3}}
class MedicalImageDataModule(pl.LightningDataModule):
"""Image Data Module"""
def __init__(self, task_type: term.ProblemType,
config: Dict[str, Any],
transform_dict: Dict[str, Any] = _TRANSFORM_DICT):
super().__init__()
self.task_type = task_type
self.configs = config
self.task = self.configs['experiment']['name']
self.spatial_size = self.configs['model']['spatial_size']
self.train_batch_size = self.configs['train']['batch_size']
self.valid_batch_size = self.configs['valid']['batch_size']
self.test_batch_size = self.configs['test']['batch_size']
self.train_num_workers = self.configs['train']['num_workers']
self.valid_num_workers = self.configs['valid']['num_workers']
self.include_test = self.configs['test']['include']
self.base_dir = Path(self.configs['experiment']['data_path'])
self.train_ids = []
self.valid_ids = []
self.test_ids = []
self.transform_dict = transform_dict
self.train_transform = None
self.valid_transform = None
self.train_set: Dataset
self.val_set: Dataset
self.test_set: Dataset
def get_data_list(self):
"""Gets the lists of image ids for train and validation."""
file_names = (self.base_dir / 'train' / 'images').glob('*__CT.nii.gz')
train_ids = [file_name.stem.split('__')[0] for file_name in file_names]
file_names = (self.base_dir / 'valid' / 'images').glob('*__CT.nii.gz')
valid_ids = [file_name.stem.split('__')[0] for file_name in file_names]
print(f'Locating data in {self.base_dir}: {len(train_ids)} for train'
f' and {len(valid_ids)} for validation')
return train_ids, valid_ids
def get_test_data_list(self):
"""Gets the lists of image ids for test."""
file_names = (self.base_dir / 'test' / 'images').glob('*__CT.nii.gz')
test_ids = [file_name.stem.split('__')[0] for file_name in file_names]
return test_ids
def prepare_data(self):
"""Loads image ids."""
self.train_ids, self.valid_ids = self.get_data_list()
if self.include_test:
self.test_ids = self.get_test_data_list()
def setup(self, stage=None):
"""Sets up data."""
self.train_transform, self.valid_transform = self.get_augmentation_transform(self.transform_dict)
train_files = [{'CT': str(self.base_dir / 'train' / 'images' / (id + '__CT.nii.gz')),
'PT': str(self.base_dir / 'train' / 'images' / (id + '__PT.nii.gz')),
'label': str(self.base_dir / 'train' / 'labels' / (id + '.nii.gz'))
} for id in self.train_ids]
self.train_set = Dataset(data=train_files, transform=self.train_transform)
print(f'Size of train dataset: {len(self.train_set)}')
valid_files = [{'CT': str(self.base_dir / 'valid' / 'images' / (id + '__CT.nii.gz')),
'PT': str(self.base_dir / 'valid' / 'images' / (id + '__PT.nii.gz')),
'label': str(self.base_dir / 'valid' / 'labels' / (id + '.nii.gz'))
} for id in self.valid_ids]
self.val_set = Dataset(data=valid_files, transform=self.valid_transform)
print(f'Size of validation dataset: {len(self.val_set)}')
if self.include_test:
self.test_ids = self.get_test_data_list()
test_files = [{'CT': str(self.base_dir / 'test' / 'images' / (id + '__CT.nii.gz')),
'PT': str(self.base_dir / 'test' / 'images' / (id + '__PT.nii.gz')),
'label': str(self.base_dir / 'test' / 'labels' / (id + '.nii.gz'))
} for id in self.test_ids]
self.test_set = Dataset(data=test_files, transform=self.valid_transform)
print(f'Size of test dataset: {len(self.test_set)}')
def get_augmentation_transform(self, transform_dict: Dict[str, Any]):
"""Gets augumentation transforms."""
train_augmentation = Compose(
[
LoadImaged(keys=['CT', 'PT', 'label'], image_only=False),
EnsureChannelFirstd(keys=['CT', 'PT', 'label']),
RandGaussianNoised(keys=['CT']),
RandStdShiftIntensityd(keys=['CT'], factors=0.2),
RandScaleIntensityd(keys=['CT'], factors=0.1),
RandFlipd(keys=['CT', 'PT', 'label'], prob=transform_dict['flip']['p'],
spatial_axis=transform_dict['flip']['axes']),
EnsureTyped(keys=['CT', 'PT', 'label']), # Note: label not in one-hot form
# AsDiscreted(keys=['label'], to_onehot=self.configs['metric']['num_classes'])
ConcatItemsd(keys=['CT', 'PT'], name="input", dim=0)
]
)
valid_augmentation = Compose(
[
LoadImaged(keys=['CT', 'PT', 'label'], image_only=False),
EnsureChannelFirstd(keys=['CT', 'PT', 'label']),
# AsDiscreted(keys=['label'], to_onehot=self.configs['metric']['num_classes'])
ConcatItemsd(keys=['CT', 'PT'], name="input", dim=0)
]
)
return train_augmentation, valid_augmentation
def train_dataloader(self):
# p_dataset = PatchDataset(self.train_set, patch_func=lambda x: x, # type: ignore
# samples_per_image=self.configs['train']['samples_per_volume'])
dataloader = DataLoader(self.train_set, batch_size=self.train_batch_size,
num_workers=self.train_num_workers, shuffle=True)
print(f'Train dataloader length: {len(dataloader)}')
if len(dataloader) == 0:
raise ValueError('No train data batch available.')
return dataloader
def val_dataloader(self):
dataloader = DataLoader(self.val_set, batch_size=self.valid_batch_size,
num_workers=self.valid_num_workers, shuffle=False)
print(f'Validation dataloader length: {len(dataloader)}')
if len(dataloader) == 0:
raise ValueError('No validation data batch available.')
return dataloader
def test_dataloader(self):
dataloader = DataLoader(self.test_set, batch_size=self.test_batch_size,
num_workers=self.valid_num_workers, shuffle=False)
print(f'Test dataloader length: {len(dataloader)}')
if len(dataloader) == 0:
raise ValueError('No test data batch available.')
return dataloader
培训师代码:
trainer = pl.Trainer(accelerator="gpu",
devices=FLAGS.num_devices,
# num_nodes=FLAGS.num_nodes,
logger=logger,
max_epochs=max_epochs,
check_val_every_n_epoch=1,
# precision=16,
num_sanity_val_steps=0,
enable_model_summary=True,
enable_progress_bar=True,
log_every_n_steps=config['train']['logging_frequency_steps'],
callbacks=callbacks,
strategy=FLAGS.strategy
)
错误信息:
2023-08-21 20:22:25.522017: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations. To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags. W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT 2023-08-21 20:22:27.490978: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1960] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Skipping registering GPU devices... Max epochs: 300 Use Dice CrossEntropy loss Starting learning rate: 0.0001 GPU available: True (cuda), used: True TPU available: False, using: 0 TPU cores IPU available: False, using: 0 IPUs HPU available: False, using: 0 HPUs Locating data in /workspace/data/processed_128x128: 1830 for train and 390 for validation Initializing distributed: GLOBAL_RANK: 0, MEMBER: 1/2 2023-08-21 20:22:33.722437: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations. To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags. 2023-08-21 20:22:34.528897: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT 2023-08-21 20:22:35.020353: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1960] Cannot dlopen some GPU libraries. Skipping registering GPU devices... Max epochs: 300 Use Dice CrossEntropy loss Starting learning rate: 0.0001 Initializing distributed: GLOBAL_RANK: 1, MEMBER: 2/2 I0821 20:22:36.928924 139652776452736 distributed_c10d.py:442] Added key: store_based_barrier_key:1 to store for rank: 1 I0821 20:22:36.938210 139831611515520 distributed_c10d.py:442] Added key: store_based_barrier_key:1 to store for rank: 0 I0821 20:22:36.938375 139831611515520 distributed_c10d.py:476] Rank 0: Completed store-based barrier for key:store_based_barrier_key:1 with 2 nodes.
---------------------------------------------------------------------------------------------------- distributed_backend=nccl All distributed processes registered. Starting with 2 processes
----------------------------------------------------------------------------------------------------
I0821 20:22:36.939174 139652776452736 distributed_c10d.py:476] Rank 1: Completed store-based barrier for key:store_based_barrier_key:1 with 2 nodes. You are using a CUDA device ('NVIDIA A100-SXM4-80GB') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance.
**Size of train dataset: 0 Size of validation dataset: 0 Size of train dataset: 1830 Size of validation dataset: 390** LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1,2,3] LOCAL_RANK: 1 - CUDA_VISIBLE_DEVICES: [0,1,2,3] ...
File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/sampler.py", line 107, in __init__
raise ValueError("num_samples should be a positive integer " ValueError: num_samples should be a positive integer value, but got num_samples=0
发现问题所在,就在prepare_data()中。 在pytorch Lightning中,prepare_data仅在主进程中调用,我不应该在调用中设置状态,因为状态仅在单个进程中可用。我将这些行移至 setup() 并解决了问题。