在 Torch rpc 教程中使用 GPU 服务进行强化学习时出现“CUDA 内存不足”

问题描述 投票:0回答:1

我按照这个教程在Torch上使用RPC实现强化学习。

目前,我使用一个训练者进程和一个观察者进程。

训练器进程创建模型,观察者进程使用RPC向前调用模型。

按照原始教程所示为模型添加指定的GPU设备后,我遇到了“cuda内存不足”问题。

为了简化重现,我从教程中删除了一些原始代码。

from itertools import count

import gym
import torch
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist

AGENT_NAME = "agent_{}"
OBSERVER_NAME = "obs_{}_for_{}"

class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=1)


class Observer:

    def __init__(self):
        self.id = rpc.get_worker_info().id
        self.env = gym.make('CartPole-v1')
        # self.env.seed(args.seed)

    def run_episode(self, agent_rref):
        state, ep_reward = self.env.reset()[0], 0
        for _ in range(10000):
            # send the state to the agent to get an action
            action = agent_rref.rpc_sync().select_action(self.id, state)
            # apply the action to the environment, and get the reward
            state, reward, terminated, truncated, _ = self.env.step(action)
            # report the reward to the agent for training purpose
            agent_rref.rpc_sync().report_reward(self.id, reward)
            # finishes after the number of self.env._max_episode_steps
            if terminated or truncated:
                break
        torch.cuda.empty_cache()


class Agent:
    def __init__(self, rank, observer_size_pre_trainer, infos):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.saved_log_probs = {}
        self.device_id = rank % torch.cuda.device_count()
        self.policy = Policy().to(self.device_id)
        self.rank = rank
        for ob_rank in range(0, observer_size_pre_trainer):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank, rank))
            self.ob_rrefs.append(remote(ob_info, Observer))
            self.rewards[ob_info.id] = []
            self.saved_log_probs[ob_info.id] = []

    def select_action(self, ob_id, state):
        state: torch.Tensor = torch.from_numpy(state).float().unsqueeze(0).to(self.device_id)
        probs = self.policy(state)
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        result = action.item()

        del action, m, state, probs
        return result

    def report_reward(self, ob_id, reward):
        self.rewards[ob_id].append(reward)

    def run_episode(self):
        futs = []
        for ob_rref in self.ob_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(
                rpc_async(
                    ob_rref.owner(),
                    ob_rref.rpc_sync().run_episode,
                    args=(self.agent_rref,)
                )
            )

        # wait until all obervers have finished this episode
        for fut in futs:
            fut.wait()

    def finish_episode(self):
        # joins probs and rewards from different observers into lists
        R, probs, rewards = 0, [], []
        for ob_id in self.rewards:
            probs.extend(self.saved_log_probs[ob_id])
            rewards.extend(self.rewards[ob_id])

        # clear saved probs and rewards
        for ob_id in self.rewards:
            self.rewards[ob_id] = []
            self.saved_log_probs[ob_id] = []
        del probs, rewards
        return 0


def get_observer_name(rank, trainer_size):
    observer_rank = (rank - trainer_size) // trainer_size
    trainer_rank = rank % trainer_size
    return OBSERVER_NAME.format(observer_rank, trainer_rank)


def run_worker(rank, trainer_size, observer_size_pre_trainer, infos):
    rpc_backend_options = rpc.TensorPipeRpcBackendOptions(
        init_method='tcp://localhost:29500',
        num_worker_threads=1024,
    )
    world_size = observer_size_pre_trainer * trainer_size + trainer_size
    if rank < trainer_size:
        dist.init_process_group(
            rank=rank, world_size=trainer_size, init_method="tcp://localhost:29501"
        )
        name = AGENT_NAME.format(rank)
        print(f"{name} started")
        rpc.init_rpc(name, rank=rank, world_size=world_size,
                     rpc_backend_options=rpc_backend_options)

        agent = Agent(rank, observer_size_pre_trainer, infos)
        for i_episode in count(1):
            agent.run_episode()
            agent.finish_episode()
            print(f"episode : {i_episode}, mem_used: {torch.cuda.memory_allocated(agent.device_id) / 1024 / 1024:.2f}Mb")
    else:
        observer = get_observer_name(rank, trainer_size)
        print(f"{observer} started")
        # other ranks are the observer
        rpc.init_rpc(observer, rank=rank, world_size=world_size,
                     rpc_backend_options=rpc_backend_options)
        # observers passively waiting for instructions from the agent

    # block until all rpcs finish, and shutdown the RPC instance
    rpc.shutdown()


def main():
    mp.spawn(
        run_worker,
        args=(1, 1, {}),
        nprocs=2,
        join=True
    )


if __name__ == "__main__":
    torch.multiprocessing.set_start_method('spawn')
    main()

我得到了结果

/home/lu/PycharmProjects/tetris/venv/bin/python /home/lu/PycharmProjects/tetris/test_error.py 
obs_0_for_0 started
WARNING: Logging before InitGoogleLogging() is written to STDERR
I20230807 23:22:56.714407 262881 ProcessGroupNCCL.cpp:665] [Rank 0] ProcessGroupNCCL initialized with following options:
NCCL_ASYNC_ERROR_HANDLING: 0
NCCL_DESYNC_DEBUG: 0
NCCL_BLOCKING_WAIT: 0
TIMEOUT(ms): 1800000
USE_HIGH_PRIORITY_STREAM: 0
I20230807 23:22:56.714471 262980 ProcessGroupNCCL.cpp:842] [Rank 0] NCCL watchdog thread started!
agent_0 started
/home/lu/PycharmProjects/tetris/venv/lib/python3.11/site-packages/gym/utils/passive_env_checker.py:233: DeprecationWarning: `np.bool8` is a deprecated alias for `np.bool_`.  (Deprecated NumPy 1.24)
  if not isinstance(terminated, (bool, np.bool8)):
episode : 1, mem_used: 146.25Mb
episode : 2, mem_used: 251.88Mb
episode : 3, mem_used: 438.75Mb
episode : 4, mem_used: 682.50Mb
.....

episode : 44, mem_used: 4834.38Mb

At:
  /usr/lib/python3.11/site-packages/torch/distributed/rpc/internal.py(234): _handle_exception
')
Traceback (most recent call last):
  File "/usr/lib/python3.11/site-packages/torch/distributed/rpc/internal.py", line 207, in _run_function
    result = python_udf.func(*python_udf.args, **python_udf.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/site-packages/torch/distributed/rpc/rref_proxy.py", line 42, in _invoke_rpc
    return _rref_type_cont(rref_fut)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/site-packages/torch/distributed/rpc/rref_proxy.py", line 31, in _rref_type_cont
    return rpc_api(
           ^^^^^^^^
  File "/usr/lib/python3.11/site-packages/torch/distributed/rpc/api.py", line 82, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/site-packages/torch/distributed/rpc/api.py", line 809, in rpc_sync
    return fut.wait()
           ^^^^^^^^^^
RuntimeError: RuntimeError: On WorkerInfo(id=1, name=obs_0_for_0):
RuntimeError('OutOfMemoryError: On WorkerInfo(id=0, name=agent_0):
OutOfMemoryError('CUDA out of memory. Tried to allocate 20.00 MiB (GPU 0; 10.75 GiB total capacity; 4.73 GiB already allocated; 10.88 MiB free; 5.82 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF')
Traceback (most recent call last):
  File "/usr/lib/python3.11/site-packages/torch/distributed/rpc/internal.py", line 207, in _run_function
    result = python_udf.func(*python_udf.args, **python_udf.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/site-packages/torch/distributed/rpc/rref_proxy.py", line 11, in _local_invoke
    return getattr(rref.local_value(), func_name)(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lu/PycharmProjects/tetris/test_error.py", line 71, in select_action
    probs = self.policy(state)
            ^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/site-packages/torch/nn/modules/module.py", line 1501, in _call_impl
    return forward_call(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lu/PycharmProjects/tetris/test_error.py", line 25, in forward
    x = self.affine1(x)
        ^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/site-packages/torch/nn/modules/module.py", line 1501, in _call_impl
    return forward_call(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/site-packages/torch/nn/modules/linear.py", line 114, in forward
    return F.linear(input, self.weight, self.bias)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
torch.cuda.OutOfMemoryError: CUDA out of memory. Tried to allocate 20.00 MiB (GPU 0; 10.75 GiB total capacity; 4.73 GiB already allocated; 10.88 MiB free; 5.82 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF



Process finished with exit code 1

我使用manjaro(Arch linux操作系统),python 3.11和torch 2.0.1

cuda device rpc torch oom
1个回答
0
投票

显然你的 GPU 内存不足。

尝试添加:

del variables
gc.collect()

(需要导入

gc
)在你的行之后:

torch.cuda.empty_cache()

这可能会在一段时间内有所帮助。

否则,您将不得不减少批量大小。

无论如何,torch.cuda.memory_summary应该可以帮助您详细查看内存问题。

© www.soinside.com 2019 - 2024. All rights reserved.