如何解决以下错误“DistStoreError:等待客户端 61 秒后超时。 1/2 客户加入。”当使用 2 个 GPU 训练模型时

问题描述 投票:0回答:1

当我尝试使用 PyTorch 框架在具有两个 GPU 的单台机器上进行训练时,程序卡在

_init_dist_pytorch('nccl')
步骤。单步调试显示程序实际上卡在了

return TCPStore(
            hostname, port, world_size, start_daemon, timeout, multi_tenant=True, use_libuv=use_libuv
        )

这里,如果我设置world_size=1,程序就可以正常运行,并成功初始化整个进程组。然而,当world_size=2时,程序会卡住并触发错误

DistStoreError: Timed out after 300 seconds waiting for clients. 1/2 clients joined.

我已尝试以下方法来解决此问题:

  • 调整 MASTER_PORT 值。
  • 更改防火墙配置(允许使用8090端口和MASTER_PORT环境变量对应的端口进行通信)。
  • 禁用整个防火墙。
  • 使用 python -m 终端命令而不是 python 终端命令运行脚本。
  • 然而,这些尝试都没有成功。

预期输出:

  • 脚本应该能够正常返回 TCPStore 对象。
  • 该脚本应该能够在具有两个 GPU 的单台机器上正常训练模型。

附加信息:

  • 我尝试将 MASTER_PORT 设置为不同的值,包括 23455、8090 和 12345。
  • 我尝试使用 sudo ufw disable 命令禁用防火墙。
  • 我尝试使用 python -m torch.distributed.launch 命令运行脚本。

我希望找到一个解决方案,让我能够在具有两个 GPU 的单台机器上进行训练,而不会遇到此错误。

当我尝试在 Jupyter Notebook 中重现此错误时,我发现如果我通过

调用 TCPStore 构造函数
tmp = TCPStore('localhost', 2313, 1, True, timeout=default_pg_timeout,
               multi_tenant=True,use_libuv=False)
print('world_size=1 done')
tmp2 = TCPStore('localhost', 2313, 2, True, timeout=default_pg_timeout,
               multi_tenant=True,use_libuv=False)
print('world_size=2 done')

两行代码都可以正常执行。但是如果我通过

调用 TCPStore 构造函数
os.environ['RANK']='0'
os.environ['WORLD_SIZE']='2'
os.environ['MASTER_ADDR']='localhost'
os.environ['MASTER_PORT']='12340'
os.environ['CUDA_VISIBLE_DEVICES']='0,1'
_init_dist_pytorch('nccl')
print('world_size=2 process group initialized!')

会触发超时错误。

这里是最小可重现的示例(为了更容易重现问题,我设置了default_pg_timeout = timedelta(seconds=60),将等待时间设置为60秒。该变量默认为300秒。如有必要,请调整此值直接声明。)

在我的环境中,输出如下: 输出1 输出2

整个输出2:

{
    "name": "DistStoreError",
    "message": "Timed out after 61 seconds waiting for clients. 1/2 clients joined.",
    "stack": "---------------------------------------------------------------------------
DistStoreError                            Traceback (most recent call last)
Cell In[10], line 15
     13 os.environ['MASTER_PORT']='12340'
     14 os.environ['CUDA_VISIBLE_DEVICES']='0,1'
---> 15 _init_dist_pytorch('nccl')
     16 print('world_size=2 process group initialized!')

Cell In[9], line 6, in _init_dist_pytorch(backend, **kwargs)
      4 num_gpus = torch.cuda.device_count()
      5 torch.cuda.set_device(rank % num_gpus)
----> 6 dist.init_process_group(backend=backend, rank=rank, world_size=world_size)

File ~/miniconda3/envs/visualtext/lib/python3.10/site-packages/torch/distributed/c10d_logger.py:86, in _time_logger.<locals>.wrapper(*args, **kwargs)
     83 @functools.wraps(func)
     84 def wrapper(*args, **kwargs):
     85     t1 = time.time_ns()
---> 86     func_return = func(*args, **kwargs)
     87     time_spent = time.time_ns() - t1
     89     msg_dict = _get_msg_dict(func.__name__, *args, **kwargs)

File ~/miniconda3/envs/visualtext/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py:1177, in init_process_group(backend, init_method, timeout, world_size, rank, store, group_name, pg_options)
   1173 if store is None:
   1174     rendezvous_iterator = rendezvous(
   1175         init_method, rank, world_size, timeout=timeout
   1176     )
-> 1177     store, rank, world_size = next(rendezvous_iterator)
   1178     store.set_timeout(timeout)
   1180     # Use a PrefixStore to avoid accidental overrides of keys used by
   1181     # different systems (e.g. RPC) in case the store is multi-tenant.

File ~/miniconda3/envs/visualtext/lib/python3.10/site-packages/torch/distributed/rendezvous.py:246, in _env_rendezvous_handler(url, timeout, **kwargs)
    243 master_port = int(_get_env_or_raise(\"MASTER_PORT\"))
    244 use_libuv = query_dict.get(\"use_libuv\", os.environ.get(\"USE_LIBUV\", \"0\")) == \"1\"
--> 246 store = _create_c10d_store(master_addr, master_port, rank, world_size, timeout, use_libuv)
    248 yield (store, rank, world_size)
    250 # If this configuration is invalidated, there is nothing we can do about it

File ~/miniconda3/envs/visualtext/lib/python3.10/site-packages/torch/distributed/rendezvous.py:174, in _create_c10d_store(hostname, port, rank, world_size, timeout, use_libuv)
    172 else:
    173     start_daemon = rank == 0
--> 174     return TCPStore(
    175         hostname, port, world_size, start_daemon, timeout=datetime.timedelta(seconds=60), multi_tenant=True, use_libuv=use_libuv
    176     )

DistStoreError: Timed out after 61 seconds waiting for clients. 1/2 clients joined."
}

代码:

try:
    from urllib.parse import urlparse, urlunparse
except ImportError as e:
    raise ImportError(
        "urllib cannot be found, urlparse from python2 is no longer supported."
    ) from e

import numbers
import os
import sys
from datetime import timedelta
from typing import Dict, Optional

from torch.distributed import FileStore, PrefixStore, Store, TCPStore
default_pg_timeout = timedelta(seconds=60)
# from .constants import default_pg_timeout


_rendezvous_handlers = {}


def register_rendezvous_handler(scheme, handler):
    """
    Register a new rendezvous handler.

    Before we can run collective algorithms, participating processes
    need to find each other and exchange information to be able to
    communicate. We call this process rendezvous.

    The outcome of the rendezvous process is a triplet containing a
    shared key/value store, the rank of the process, and the total
    number of participating processes.

    If none of the bundled rendezvous methods apply to your execution
    environment you can opt to register your own rendezvous handler.
    Pick a unique name and use the URL scheme to identify it when
    calling the `rendezvous()` function.

    Args:
        scheme (str): URL scheme to identify your rendezvous handler.
        handler (function): Handler that is invoked when the
            `rendezvous()` function is called with a URL that uses
            the corresponding scheme. It must be a generator function
            that yields the triplet.
    """
    global _rendezvous_handlers
    if scheme in _rendezvous_handlers:
        raise RuntimeError(
            f"Rendezvous handler for {scheme}:// already registered"
        )
    _rendezvous_handlers[scheme] = handler


# Query will have format "rank=0&world_size=1" and is
# converted into {"rank": 0, "world_size": 1}
def _query_to_dict(query: str) -> Dict[str, str]:
    return {pair[0]: pair[1] for pair in (pair.split("=") for pair in filter(None, query.split("&")))}


def _rendezvous_helper(url: str, rank: int, world_size_opt: Optional[int], **kwargs):
    result = urlparse(url)
    if world_size_opt is None:
        world_size = -1
        if result.scheme == "env":
            rank = int(os.environ.get("RANK", rank))
            # If the world_size env variable is not present then it is a dynamic group
            world_size = int(os.environ.get("WORLD_SIZE", world_size))
    else:
        world_size = world_size_opt
    if rank != -1 or world_size != -1 or world_size_opt is None:
        query_dict = _query_to_dict(result.query)
        assert (
            "rank" not in query_dict and "world_size" not in query_dict
        ), f"The url: {url} has node-specific arguments(rank, world_size) already."
        if rank != -1:
            query_dict["rank"] = str(rank)
        if world_size != -1 or world_size_opt is None:
            query_dict["world_size"] = str(world_size)
        result = result._replace(
            query=f"{'&'.join([f'{k}={v}' for k, v in query_dict.items()])}"
        )
        url = urlunparse(result)

    if result.scheme not in _rendezvous_handlers:
        raise RuntimeError(f"No rendezvous handler for {result.scheme}://")
    return _rendezvous_handlers[result.scheme](url, **kwargs)


def rendezvous(url: str, rank: int = -1, world_size: int = -1, **kwargs):
    if not isinstance(url, (str, bytes)):
        raise RuntimeError(f"`url` must be a string. {type(url)}: {url}")

    if not isinstance(rank, numbers.Integral):
        raise RuntimeError(f"`rank` must be an integer. {rank}")

    if not isinstance(world_size, numbers.Integral):
        raise RuntimeError(f"`world_size` must be an integer. {world_size}")

    return _rendezvous_helper(url, rank, world_size, **kwargs)


def _create_store_from_options(backend_options, rank):
    store, _, _ = next(_rendezvous_helper(backend_options.init_method, rank, None))
    return store


def _rendezvous_error(msg):
    return ValueError("Error initializing torch.distributed using " + msg)


def _file_rendezvous_handler(url: str, **kwargs):
    def _error(msg):
        return _rendezvous_error("file:// rendezvous: " + msg)

    result = urlparse(url)
    path = result.path
    if sys.platform == "win32":
        import urllib.request

        full_path = result.netloc + result.path
        path = urllib.request.url2pathname(full_path)
        if path:
            # Normalizing an empty string produces ".", which is not expected.
            path = os.path.normpath(path)

    if not path:
        raise _error("path missing")
    query_dict = _query_to_dict(result.query)
    if "rank" not in query_dict:
        raise _error("rank parameter missing")
    if "world_size" not in query_dict:
        raise _error("world size parameter missing")

    rank = int(query_dict["rank"])
    world_size = int(query_dict["world_size"])
    store = FileStore(path, world_size)
    yield (store, rank, world_size)

    # If this configuration is invalidated, there is nothing we can do about it
    raise RuntimeError("Unable to perform rerendezvous using file:// method")


def _torchelastic_use_agent_store() -> bool:
    return os.environ.get("TORCHELASTIC_USE_AGENT_STORE", None) == str(True)

import datetime
def _create_c10d_store(hostname, port, rank, world_size, timeout, use_libuv=False) -> Store:
    """
    Smartly creates a c10d Store object on ``rank`` based on whether we need to re-use agent store.

    The TCPStore server is assumed to be hosted
    on ``hostname:port``.

    If ``torchelastic_use_agent_store()`` is ``True``, then it is assumed that
    the agent leader (node rank 0) hosts the TCPStore server (for which the
    endpoint is specified by the given ``hostname:port``). Hence
    ALL ranks will create and return a TCPStore client (e.g. ``start_daemon=False``).

    If ``torchelastic_use_agent_store()`` is ``False``, then rank 0 will host
    the TCPStore (with multi-tenancy) and it is assumed that rank 0's hostname
    and port are correctly passed via ``hostname`` and ``port``. All
    non-zero ranks will create and return a TCPStore client.
    """
    # check if port is uint16_t
    if not 0 <= port < 2**16:
        raise ValueError(f"port must have value from 0 to 65535 but was {port}.")

    if _torchelastic_use_agent_store():
        attempt = os.environ["TORCHELASTIC_RESTART_COUNT"]
        tcp_store = TCPStore(hostname, port, world_size, False, timeout)
        return PrefixStore(f"/worker/attempt_{attempt}", tcp_store)
    else:
        start_daemon = rank == 0
        return TCPStore(
            hostname, port, world_size, start_daemon, timeout=datetime.timedelta(seconds=60), multi_tenant=True, use_libuv=use_libuv
        )


def _tcp_rendezvous_handler(
    url: str, timeout: timedelta = timedelta, **kwargs
):
    def _error(msg):
        return _rendezvous_error("tcp:// rendezvous: " + msg)

    result = urlparse(url)
    if not result.port:
        raise _error("port number missing")
    query_dict = _query_to_dict(result.query)
    if "rank" not in query_dict:
        raise _error("rank parameter missing")
    if "world_size" not in query_dict:
        raise _error("world size parameter missing")

    rank = int(query_dict["rank"])
    world_size = int(query_dict["world_size"])
    use_libuv = query_dict.get("use_libuv", "0") == "1"
    assert result.hostname is not None

    store = _create_c10d_store(result.hostname, result.port, rank, world_size, timeout, use_libuv)

    yield (store, rank, world_size)

    # If this configuration is invalidated, there is nothing we can do about it
    raise RuntimeError("Unable to perform re-rendezvous using tcp:// method")


def _env_rendezvous_handler(
    url: str, timeout: timedelta = default_pg_timeout, **kwargs
):
    def _error(msg):
        return _rendezvous_error("env:// rendezvous: " + msg)

    def _env_error(var):
        return _error(f"environment variable {var} expected, but not set")

    def _get_env_or_raise(env_var: str) -> str:
        env_val = os.environ.get(env_var, None)
        if not env_val:
            raise _env_error(env_var)
        else:
            return env_val

    result = urlparse(url)
    query_dict = _query_to_dict(result.query)

    rank: int
    world_size: int
    master_port: int
    master_addr: str

    if "rank" in query_dict:
        rank = int(query_dict["rank"])
    else:
        rank = int(_get_env_or_raise("RANK"))

    if "world_size" in query_dict:
        world_size = int(query_dict["world_size"])
    else:
        world_size = int(_get_env_or_raise("WORLD_SIZE"))


    master_addr = _get_env_or_raise("MASTER_ADDR")
    master_port = int(_get_env_or_raise("MASTER_PORT"))
    use_libuv = query_dict.get("use_libuv", os.environ.get("USE_LIBUV", "0")) == "1"

    store = _create_c10d_store(master_addr, master_port, rank, world_size, timeout, use_libuv)

    yield (store, rank, world_size)

    # If this configuration is invalidated, there is nothing we can do about it
    raise RuntimeError("Unable to perform re-rendezvous using env:// method")


register_rendezvous_handler("tcp", _tcp_rendezvous_handler)
register_rendezvous_handler("env", _env_rendezvous_handler)
register_rendezvous_handler("file", _file_rendezvous_handler)





""" return TCPStore(
            hostname, port, world_size, start_daemon, timeout=datetime.timedelta(seconds=60), multi_tenant=True, use_libuv=use_libuv
        ) """
tmp = TCPStore('localhost', 2313, 1, True, timeout=default_pg_timeout,
               multi_tenant=True,use_libuv=False)
print('world_size=1 done')
tmp2 = TCPStore('localhost', 2313, 2, True, timeout=default_pg_timeout,
               multi_tenant=True,use_libuv=False)
print('world_size=2 done')




import time
import accelerate
import torch.distributed as dist
import argparse
import logging
import os
import os.path as osp
import torch
import fastai

from basicsr.utils import (get_env_info, get_root_logger, get_time_str,
                           scandir)
from basicsr.utils.options import copy_opt_file, dict2str
from omegaconf import OmegaConf

from ldm.data.dataset_depth import DepthDataset
from ldm.data.dataset_sketch import SketchDataset
from basicsr.utils.dist_util import get_dist_info, init_dist, master_only
from ldm.modules.encoders.adapter import Adapter
from ldm.util import load_model_from_config
import random



def _init_dist_pytorch(backend, **kwargs):
    rank = int(os.environ['RANK'])
    world_size = int(os.environ['WORLD_SIZE'])
    num_gpus = torch.cuda.device_count()
    torch.cuda.set_device(rank % num_gpus)
    dist.init_process_group(backend=backend, rank=rank, world_size=world_size)


# world_size=1 process group could be initialized successfully
""" os.environ['RANK']='0'
os.environ['WORLD_SIZE']='1'
os.environ['MASTER_ADDR']='localhost'
os.environ['MASTER_PORT']='23144'
os.environ['CUDA_VISIBLE_DEVICES']='0'
_init_dist_pytorch('nccl')
print('world_size=1 process group initialized!') """

os.environ['RANK']='0'
os.environ['WORLD_SIZE']='2'
os.environ['MASTER_ADDR']='localhost'
os.environ['MASTER_PORT']='12340'
os.environ['CUDA_VISIBLE_DEVICES']='0,1'
_init_dist_pytorch('nccl')
print('world_size=2 process group initialized!')

环境:

Python 3.10.14

requirements:
transformers==4.19.2
diffusers==0.11.1
invisible_watermark==0.1.5
basicsr==1.4.2
einops==0.6.0
omegaconf==2.3.0
pytorch_lightning==1.5.9
gradio
opencv-python
pudb
imageio
imageio-ffmpeg
k-diffusion
webdataset
open-clip-torch
kornia
safetensors
timm
torch
torchvision
numpy
matplotlib
accelerate
pytorch distributed
1个回答
0
投票

我使用

解决了类似的问题
torchrun your_script.py

根据https://pytorch.org/docs/stable/elastic/run.html,您不需要设置所有这些环境变量。

© www.soinside.com 2019 - 2024. All rights reserved.