我正在使用 Pytorch Lightning 运行使用 DDP 的分布式训练 Python 脚本。我使用 DDPStrategy 来定义后端、自定义超时和自定义集群环境作为
我正在使用 Pytorch Lightning 运行使用 DDP 的 。我使用将 DDPStrategy
后端、自定义超时和自定义集群环境定义为 ClusterEnvironment 类实现
我的训练代码部分看起来像
strategy = DDPStrategy(
cluster_environment=CustomEnvironment(),
process_group_backend="nccl",
timeout=CUSTOM_TIMEOUT,
find_unused_parameters=True)
# Initialize a trainer
trainer = pl.Trainer(logger=logger,
callbacks=[checkpoint_callback],
max_epochs=hparams["epochs"],
devices=devices,
accelerator=accelerator,
strategy=strategy)
其中 devices = 4
(因为每个节点有 4 个 gpu), accelerator = "gpu"
类 CustomEnvironment
定义如下
from typing import Union,Any,Dict
from pytorch_lightning.plugins.environments import ClusterEnvironment
from pytorch_lightning.strategies.ddp import DDPStrategy
from datetime import timedelta
DEFAULT_TIMEOUT = timedelta(seconds=1800)
CUSTOM_TIMEOUT = timedelta(seconds=3600)
class CustomEnvironment(ClusterEnvironment):
def __init__(self, num_nodes=2):
super().__init__()
self._num_nodes = num_nodes
self._master_port = None
self._world_size = None
self._global_rank = None
def creates_processes_externally(self):
# Assuming PyTorch Lightning manages processes internally
return False
def detect(self):
# Implement detection of nodes and processes if necessary
log.debug("Detect method is called.")
def global_rank(self):
if self._global_rank is None:
self._global_rank = int(os.getenv("RANK", 0))
log.debug(f"GLOBAL_RANK: {self._global_rank}")
return self._global_rank
@property
def main_address(self):
return self.master_address()
@property
def main_port(self):
return self.master_port()
def set_global_rank(self, rank: int):
self._global_rank = rank
log.debug(f"Set GLOBAL_RANK: {self._global_rank}")
def set_world_size(self, world_size: int):
self._world_size = world_size
log.debug(f"Set WORLD_SIZE: {self._world_size}")
def master_address(self):
MASTER_ADDR = os.getenv("MASTER_ADDR")
log.debug(f"MASTER_ADDR: {MASTER_ADDR}")
return MASTER_ADDR
def master_port(self):
if self._master_port is None:
self._master_port = os.getenv("MASTER_PORT")
log.debug(f"MASTER_PORT: {self._master_port}")
return int(self._master_port)
def world_size(self):
if self._world_size is None:
log.debug("WORLD_SIZE is not set.")
return self._world_size
def node_rank(self):
MY_RANK = int(os.getenv("NODE_RANK", "0"))
log.debug(f"NODE_RANK: {MY_RANK}")
return int(MY_RANK)
def local_rank(self) -> int:
LOCAL_RANK = int(os.getenv("LOCAL_RANK", "0"))
log.debug(f"LOCAL_RANK: {LOCAL_RANK}")
return LOCAL_RANK
在主服务器上我得到了这个配置
DEBUG:train:NODE_RANK: 0
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:Set GLOBAL_RANK: 0
DEBUG:train:Set WORLD_SIZE: 8
DEBUG:train:GLOBAL_RANK: 0
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
2024-08-21 17:51:24 training on gpu with 4 gpu
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:NODE_RANK: 0
DEBUG:train:NODE_RANK: 0
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:Set GLOBAL_RANK: 0
DEBUG:train:Set WORLD_SIZE: 8
DEBUG:train:GLOBAL_RANK: 0
DEBUG:train:GLOBAL_RANK: 0
DEBUG:train:GLOBAL_RANK: 0
DEBUG:train:MASTER_ADDR: 10.0.147.2
DEBUG:train:MASTER_PORT: 9001
Initializing distributed: GLOBAL_RANK: 0, MEMBER: 1/8
在客户端上我得到了
DEBUG:train:NODE_RANK: 1
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:Set GLOBAL_RANK: 4
DEBUG:train:Set WORLD_SIZE: 8
DEBUG:train:GLOBAL_RANK: 4
2024-08-21 17:51:24 training on gpu with 4 gpu
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:NODE_RANK: 1
DEBUG:train:NODE_RANK: 1
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:Set GLOBAL_RANK: 4
DEBUG:train:Set WORLD_SIZE: 8
DEBUG:train:GLOBAL_RANK: 4
DEBUG:train:GLOBAL_RANK: 4
DEBUG:train:GLOBAL_RANK: 4
DEBUG:train:MASTER_ADDR: 10.0.147.2
DEBUG:train:MASTER_PORT: 9001
Initializing distributed: GLOBAL_RANK: 4, MEMBER: 5/8
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:LOCAL_RANK: 0
DEBUG:train:GLOBAL_RANK: 4
与我为其设置的环境一致 MASTER_ADDR
, MASTER_PORT
, NODE_RANK
且 WORLD_SIZE
需要 最小环境,在我的配置中我有 4 个 GPUS x 2 个节点,因此 WORLD_SIZE 设置为 8。
此外,我还手动设置了主机上的网络接口的 NCCL 环境 ipconfig
,并禁用任何 P2P(NCCL_P2P_DISABLE 变量禁用点对点 (P2P) 传输,该传输使用 CUDA 在 GPU 之间直接访问,使用 NVLink 或 PCI)。
export NCCL_SOCKET_IFNAME=eth0
export NCCL_P2P_DISABLE=1
尽管设置了时间,但使用此配置时,我遇到了超时错误。具体来说,在 MASTER 上,我遇到了一个 TCPStore
错误
return TCPStore(
torch.distributed.DistStoreError: Timed out after 1801 seconds waiting for clients. 2/4 clients joined.
然后在客户端我收到断开连接错误
[rank4]:[W821 14:18:47.728693991 socket.cpp:428] [c10d] While waitForInput, poolFD failed with (errno: 0 - Success).
2024-08-21 14:18:47 [4] is setting up NCCL communicator and retrieving ncclUniqueId from [0] via c10d key-value store by key '0', but store->get('0') got error: Connection reset by peer
...
torch.distributed.DistBackendError: [4] is setting up NCCL communicator and retrieving ncclUniqueId from [0] via c10d key-value store by key '0', but store->get('0') got error: Connection reset by peer