我按照这个教程在Torch上使用RPC实现强化学习。
目前,我使用一个训练者进程和一个观察者进程。
训练器进程创建模型,观察者进程使用RPC向前调用模型。
按照原始教程所示为模型添加指定的GPU设备后,我遇到了“cuda内存不足”问题。
为了简化重现,我从教程中删除了一些原始代码。
from itertools import count
import gym
import torch
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
AGENT_NAME = "agent_{}"
OBSERVER_NAME = "obs_{}_for_{}"
class Policy(nn.Module):
def __init__(self):
super(Policy, self).__init__()
self.affine1 = nn.Linear(4, 128)
self.dropout = nn.Dropout(p=0.6)
self.affine2 = nn.Linear(128, 2)
def forward(self, x):
x = self.affine1(x)
x = self.dropout(x)
x = F.relu(x)
action_scores = self.affine2(x)
return F.softmax(action_scores, dim=1)
class Observer:
def __init__(self):
self.id = rpc.get_worker_info().id
self.env = gym.make('CartPole-v1')
# self.env.seed(args.seed)
def run_episode(self, agent_rref):
state, ep_reward = self.env.reset()[0], 0
for _ in range(10000):
# send the state to the agent to get an action
action = agent_rref.rpc_sync().select_action(self.id, state)
# apply the action to the environment, and get the reward
state, reward, terminated, truncated, _ = self.env.step(action)
# report the reward to the agent for training purpose
agent_rref.rpc_sync().report_reward(self.id, reward)
# finishes after the number of self.env._max_episode_steps
if terminated or truncated:
break
torch.cuda.empty_cache()
class Agent:
def __init__(self, rank, observer_size_pre_trainer, infos):
self.ob_rrefs = []
self.agent_rref = RRef(self)
self.rewards = {}
self.saved_log_probs = {}
self.device_id = rank % torch.cuda.device_count()
self.policy = Policy().to(self.device_id)
self.rank = rank
for ob_rank in range(0, observer_size_pre_trainer):
ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank, rank))
self.ob_rrefs.append(remote(ob_info, Observer))
self.rewards[ob_info.id] = []
self.saved_log_probs[ob_info.id] = []
def select_action(self, ob_id, state):
state: torch.Tensor = torch.from_numpy(state).float().unsqueeze(0).to(self.device_id)
probs = self.policy(state)
m = Categorical(probs)
action = m.sample()
self.saved_log_probs[ob_id].append(m.log_prob(action))
result = action.item()
del action, m, state, probs
return result
def report_reward(self, ob_id, reward):
self.rewards[ob_id].append(reward)
def run_episode(self):
futs = []
for ob_rref in self.ob_rrefs:
# make async RPC to kick off an episode on all observers
futs.append(
rpc_async(
ob_rref.owner(),
ob_rref.rpc_sync().run_episode,
args=(self.agent_rref,)
)
)
# wait until all obervers have finished this episode
for fut in futs:
fut.wait()
def finish_episode(self):
# joins probs and rewards from different observers into lists
R, probs, rewards = 0, [], []
for ob_id in self.rewards:
probs.extend(self.saved_log_probs[ob_id])
rewards.extend(self.rewards[ob_id])
# clear saved probs and rewards
for ob_id in self.rewards:
self.rewards[ob_id] = []
self.saved_log_probs[ob_id] = []
del probs, rewards
return 0
def get_observer_name(rank, trainer_size):
observer_rank = (rank - trainer_size) // trainer_size
trainer_rank = rank % trainer_size
return OBSERVER_NAME.format(observer_rank, trainer_rank)
def run_worker(rank, trainer_size, observer_size_pre_trainer, infos):
rpc_backend_options = rpc.TensorPipeRpcBackendOptions(
init_method='tcp://localhost:29500',
num_worker_threads=1024,
)
world_size = observer_size_pre_trainer * trainer_size + trainer_size
if rank < trainer_size:
dist.init_process_group(
rank=rank, world_size=trainer_size, init_method="tcp://localhost:29501"
)
name = AGENT_NAME.format(rank)
print(f"{name} started")
rpc.init_rpc(name, rank=rank, world_size=world_size,
rpc_backend_options=rpc_backend_options)
agent = Agent(rank, observer_size_pre_trainer, infos)
for i_episode in count(1):
agent.run_episode()
agent.finish_episode()
print(f"episode : {i_episode}, mem_used: {torch.cuda.memory_allocated(agent.device_id) / 1024 / 1024:.2f}Mb")
else:
observer = get_observer_name(rank, trainer_size)
print(f"{observer} started")
# other ranks are the observer
rpc.init_rpc(observer, rank=rank, world_size=world_size,
rpc_backend_options=rpc_backend_options)
# observers passively waiting for instructions from the agent
# block until all rpcs finish, and shutdown the RPC instance
rpc.shutdown()
def main():
mp.spawn(
run_worker,
args=(1, 1, {}),
nprocs=2,
join=True
)
if __name__ == "__main__":
torch.multiprocessing.set_start_method('spawn')
main()
我得到了结果
/home/lu/PycharmProjects/tetris/venv/bin/python /home/lu/PycharmProjects/tetris/test_error.py
obs_0_for_0 started
WARNING: Logging before InitGoogleLogging() is written to STDERR
I20230807 23:22:56.714407 262881 ProcessGroupNCCL.cpp:665] [Rank 0] ProcessGroupNCCL initialized with following options:
NCCL_ASYNC_ERROR_HANDLING: 0
NCCL_DESYNC_DEBUG: 0
NCCL_BLOCKING_WAIT: 0
TIMEOUT(ms): 1800000
USE_HIGH_PRIORITY_STREAM: 0
I20230807 23:22:56.714471 262980 ProcessGroupNCCL.cpp:842] [Rank 0] NCCL watchdog thread started!
agent_0 started
/home/lu/PycharmProjects/tetris/venv/lib/python3.11/site-packages/gym/utils/passive_env_checker.py:233: DeprecationWarning: `np.bool8` is a deprecated alias for `np.bool_`. (Deprecated NumPy 1.24)
if not isinstance(terminated, (bool, np.bool8)):
episode : 1, mem_used: 146.25Mb
episode : 2, mem_used: 251.88Mb
episode : 3, mem_used: 438.75Mb
episode : 4, mem_used: 682.50Mb
.....
episode : 44, mem_used: 4834.38Mb
At:
/usr/lib/python3.11/site-packages/torch/distributed/rpc/internal.py(234): _handle_exception
')
Traceback (most recent call last):
File "/usr/lib/python3.11/site-packages/torch/distributed/rpc/internal.py", line 207, in _run_function
result = python_udf.func(*python_udf.args, **python_udf.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/site-packages/torch/distributed/rpc/rref_proxy.py", line 42, in _invoke_rpc
return _rref_type_cont(rref_fut)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/site-packages/torch/distributed/rpc/rref_proxy.py", line 31, in _rref_type_cont
return rpc_api(
^^^^^^^^
File "/usr/lib/python3.11/site-packages/torch/distributed/rpc/api.py", line 82, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/site-packages/torch/distributed/rpc/api.py", line 809, in rpc_sync
return fut.wait()
^^^^^^^^^^
RuntimeError: RuntimeError: On WorkerInfo(id=1, name=obs_0_for_0):
RuntimeError('OutOfMemoryError: On WorkerInfo(id=0, name=agent_0):
OutOfMemoryError('CUDA out of memory. Tried to allocate 20.00 MiB (GPU 0; 10.75 GiB total capacity; 4.73 GiB already allocated; 10.88 MiB free; 5.82 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation. See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF')
Traceback (most recent call last):
File "/usr/lib/python3.11/site-packages/torch/distributed/rpc/internal.py", line 207, in _run_function
result = python_udf.func(*python_udf.args, **python_udf.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/site-packages/torch/distributed/rpc/rref_proxy.py", line 11, in _local_invoke
return getattr(rref.local_value(), func_name)(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/lu/PycharmProjects/tetris/test_error.py", line 71, in select_action
probs = self.policy(state)
^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/site-packages/torch/nn/modules/module.py", line 1501, in _call_impl
return forward_call(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/lu/PycharmProjects/tetris/test_error.py", line 25, in forward
x = self.affine1(x)
^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/site-packages/torch/nn/modules/module.py", line 1501, in _call_impl
return forward_call(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/site-packages/torch/nn/modules/linear.py", line 114, in forward
return F.linear(input, self.weight, self.bias)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
torch.cuda.OutOfMemoryError: CUDA out of memory. Tried to allocate 20.00 MiB (GPU 0; 10.75 GiB total capacity; 4.73 GiB already allocated; 10.88 MiB free; 5.82 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation. See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF
Process finished with exit code 1
我使用manjaro(Arch linux操作系统),python 3.11和torch 2.0.1
显然你的 GPU 内存不足。
尝试添加:
del variables
gc.collect()
(需要导入
gc
)在你的行之后:
torch.cuda.empty_cache()
这可能会在一段时间内有所帮助。
否则,您将不得不减少批量大小。
无论如何,torch.cuda.memory_summary应该可以帮助您详细查看内存问题。