8wDlpd.png
8wDFp9.png
8wDEOx.png
8wDMfH.png
8wDKte.png

PyTorch Lightning 分布式训练与 NCCL 后端的超时错误

monkey0506 2月前

16 0

我正在使用 Pytorch Lightning 运行使用 DDP 的分布式训练 Python 脚本。我使用 DDPStrategy 来定义后端、自定义超时和自定义集群环境作为

我正在使用 Pytorch Lightning 运行使用 DDP 的 。我使用将 DDPStrategy 后端、自定义超时和自定义集群环境定义为 ClusterEnvironment 类实现

我的训练代码部分看起来像

    strategy = DDPStrategy(
            cluster_environment=CustomEnvironment(),
            process_group_backend="nccl",
            timeout=CUSTOM_TIMEOUT,
            find_unused_parameters=True)

    # Initialize a trainer
    trainer = pl.Trainer(logger=logger,
                        callbacks=[checkpoint_callback],
                         max_epochs=hparams["epochs"],
                         devices=devices,
                         accelerator=accelerator,
                         strategy=strategy)

其中 devices = 4 (因为每个节点有 4 个 gpu), accelerator = "gpu" CustomEnvironment 定义如下

from typing import Union,Any,Dict
from pytorch_lightning.plugins.environments import ClusterEnvironment
from pytorch_lightning.strategies.ddp import DDPStrategy
from datetime import timedelta
DEFAULT_TIMEOUT = timedelta(seconds=1800)
CUSTOM_TIMEOUT = timedelta(seconds=3600)

class CustomEnvironment(ClusterEnvironment):
    def __init__(self, num_nodes=2):
        super().__init__()
        self._num_nodes = num_nodes
        self._master_port = None
        self._world_size = None
        self._global_rank = None

    def creates_processes_externally(self):
        # Assuming PyTorch Lightning manages processes internally
        return False

    def detect(self):
        # Implement detection of nodes and processes if necessary
        log.debug("Detect method is called.")

    def global_rank(self):
        if self._global_rank is None:
            self._global_rank = int(os.getenv("RANK", 0))
        log.debug(f"GLOBAL_RANK: {self._global_rank}")
        return self._global_rank

    @property
    def main_address(self):
        return self.master_address()

    @property
    def main_port(self):
        return self.master_port()

    def set_global_rank(self, rank: int):
        self._global_rank = rank
        log.debug(f"Set GLOBAL_RANK: {self._global_rank}")

    def set_world_size(self, world_size: int):
        self._world_size = world_size
        log.debug(f"Set WORLD_SIZE: {self._world_size}")

    def master_address(self):
        MASTER_ADDR = os.getenv("MASTER_ADDR")
        log.debug(f"MASTER_ADDR: {MASTER_ADDR}")
        return MASTER_ADDR

    def master_port(self):
        if self._master_port is None:
            self._master_port = os.getenv("MASTER_PORT")
        log.debug(f"MASTER_PORT: {self._master_port}")
        return int(self._master_port)

    def world_size(self):
        if self._world_size is None:
            log.debug("WORLD_SIZE is not set.")
        return self._world_size

    def node_rank(self):
        MY_RANK = int(os.getenv("NODE_RANK", "0"))
        log.debug(f"NODE_RANK: {MY_RANK}")
        return int(MY_RANK)

    def local_rank(self) -> int:
        LOCAL_RANK = int(os.getenv("LOCAL_RANK", "0"))
        log.debug(f"LOCAL_RANK: {LOCAL_RANK}")
        return LOCAL_RANK

在主服务器上我得到了这个配置

DEBUG:train:NODE_RANK: 0
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:Set GLOBAL_RANK: 0
DEBUG:train:Set WORLD_SIZE: 8
DEBUG:train:GLOBAL_RANK: 0
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
2024-08-21 17:51:24 training on gpu with 4 gpu
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:NODE_RANK: 0
DEBUG:train:NODE_RANK: 0
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:Set GLOBAL_RANK: 0
DEBUG:train:Set WORLD_SIZE: 8
DEBUG:train:GLOBAL_RANK: 0
DEBUG:train:GLOBAL_RANK: 0
DEBUG:train:GLOBAL_RANK: 0
DEBUG:train:MASTER_ADDR: 10.0.147.2
DEBUG:train:MASTER_PORT: 9001
Initializing distributed: GLOBAL_RANK: 0, MEMBER: 1/8

在客户端上我得到了

DEBUG:train:NODE_RANK: 1
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:Set GLOBAL_RANK: 4
DEBUG:train:Set WORLD_SIZE: 8
DEBUG:train:GLOBAL_RANK: 4
2024-08-21 17:51:24 training on gpu with 4 gpu
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:NODE_RANK: 1
DEBUG:train:NODE_RANK: 1
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:Set GLOBAL_RANK: 4
DEBUG:train:Set WORLD_SIZE: 8
DEBUG:train:GLOBAL_RANK: 4
DEBUG:train:GLOBAL_RANK: 4
DEBUG:train:GLOBAL_RANK: 4
DEBUG:train:MASTER_ADDR: 10.0.147.2
DEBUG:train:MASTER_PORT: 9001
Initializing distributed: GLOBAL_RANK: 4, MEMBER: 5/8
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:GLOBAL_RANK: 4

与我为其设置的环境一致 MASTER_ADDR , MASTER_PORT , NODE_RANK WORLD_SIZE 需要 最小环境,在我的配置中我有 4 个 GPUS x 2 个节点,因此 WORLD_SIZE 设置为 8。

此外,我还手动设置了主机上的网络接口的 NCCL 环境 ipconfig ,并禁用任何 P2P(NCCL_P2P_DISABLE 变量禁用点对点 (P2P) 传输,该传输使用 CUDA 在 GPU 之间直接访问,使用 NVLink 或 PCI)。

export NCCL_SOCKET_IFNAME=eth0
export NCCL_P2P_DISABLE=1

尽管设置了时间,但使用此配置时,我遇到了超时错误。具体来说,在 MASTER 上,我遇到了一个 TCPStore 错误

    return TCPStore(
torch.distributed.DistStoreError: Timed out after 1801 seconds waiting for clients. 2/4 clients joined.

然后在客户端我收到断开连接错误

[rank4]:[W821 14:18:47.728693991 socket.cpp:428] [c10d] While waitForInput, poolFD failed with (errno: 0 - Success).
2024-08-21 14:18:47 [4] is setting up NCCL communicator and retrieving ncclUniqueId from [0] via c10d key-value store by key '0', but store->get('0') got error: Connection reset by peer
...
torch.distributed.DistBackendError: [4] is setting up NCCL communicator and retrieving ncclUniqueId from [0] via c10d key-value store by key '0', but store->get('0') got error: Connection reset by peer
帖子版权声明 1、本帖标题:PyTorch Lightning 分布式训练与 NCCL 后端的超时错误
    本站网址:http://xjnalaquan.com/
2、本网站的资源部分来源于网络,如有侵权,请联系站长进行删除处理。
3、会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。
4、本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
5、站长邮箱:yeweds@126.com 除非注明,本帖由monkey0506在本站《python》版块原创发布, 转载请注明出处!
最新回复 (0)
返回
作者最近主题: