Skip to content

Commit

Permalink
ADD: distributed tooling and examples
Browse files Browse the repository at this point in the history
  • Loading branch information
matbun committed Oct 31, 2023
1 parent 2b703f8 commit b0589c6
Show file tree
Hide file tree
Showing 10 changed files with 765 additions and 54 deletions.
59 changes: 54 additions & 5 deletions experimental/cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
import os
import time

from lightning.pytorch.plugins.environments import (
ClusterEnvironment as LightningClusterEnvironment,
Expand All @@ -14,13 +15,23 @@ class ClusterEnvironment(LightningClusterEnvironment):
def num_nodes(self) -> int:
"""Returns the number of nodes allocated for the current job."""

@abc.abstractmethod
def job_id(self) -> str:
"""Returns the current job ID inferred from the cluster."""


class SLURMEnvironment(LightningSLURMEnvironment):
def num_nodes(self) -> int:
"""Returns the number of nodes allocated for the current job."""
if os.environ.get('SLURM_JOB_NUM_NODES'):
return int(os.environ['SLURM_JOB_NUM_NODES'])
return int(os.environ['SLURM_NNODES'])
if os.environ.get('SLURM_NNODES'):
return int(os.environ['SLURM_NNODES'])
raise RuntimeError('Number of nodes not found in SLURM env variables')

def job_id(self) -> str:
"""Returns the current job ID inferred from the cluster."""
return os.environ['SLURM_JOB_ID']


class TorchElasticEnvironment(LightningTorchElasticEnvironment):
Expand All @@ -30,19 +41,57 @@ def num_nodes(self) -> int:
lwsize = int(os.environ['LOCAL_WORLD_SIZE'])
return gwsize//lwsize

def job_id(self) -> str:
"""Returns the current job ID inferred from the cluster."""
return os.environ['TORCHELASTIC_RUN_ID']


class LocalEnvironment(LightningEnvironment):

_job_id: str = None

def world_size(self) -> int:
if os.environ.get('WORLD_SIZE'):
return int(os.environ.get('WORLD_SIZE'))
# if os.environ.get('WORLD_SIZE'):
# return int(os.environ.get('WORLD_SIZE'))
print(
"WARNING: world_size() method in 'LocalEnvironment' returns "
f"a fixed-value placeholder world_size={self._world_size}. "
"Use it carefully!"
)
return self._world_size

def global_rank(self) -> int:
if os.environ.get('RANK'):
return int(os.environ.get('RANK'))
# if os.environ.get('RANK'):
# return int(os.environ.get('RANK'))
print(
"WARNING: global_rank() method in 'LocalEnvironment' returns "
f"a fixed-value placeholder global_rank={self._global_rank}. "
"Use it carefully!"
)
return self._global_rank

def num_nodes(self) -> int:
"""Returns the number of nodes allocated for the current job."""
return 1

def job_id(self) -> str:
"""Returns the current job ID inferred from the cluster."""
if self._job_id is None:
self._job_id = str(time.time())
return self._job_id


def detect_cluster() -> ClusterEnvironment:
"""Defines a protocol to select the ClusterEnvironment
depending on availability and priority.
"""

if SLURMEnvironment.detect():
cluster = SLURMEnvironment()
elif TorchElasticEnvironment.detect():
cluster = TorchElasticEnvironment()
elif LocalEnvironment.detect():
cluster = LocalEnvironment()
else:
raise NotImplementedError("Unrecognized cluster env")
return cluster
71 changes: 39 additions & 32 deletions experimental/distrib_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

from strategy import Strategy, DDPStrategy
from launcher import DummyTorchElasticLauncher, TorchElasticLauncher
from cluster import (
LocalEnvironment, SLURMEnvironment,
TorchElasticEnvironment
from launcher_factory import (
LauncherFactory,
SimpleLauncherFactory,
TorchElasticLauncherFactory
)
from distributed_tools import DistributedTooling


class UniformRndDataset(Dataset):
Expand Down Expand Up @@ -65,6 +67,7 @@ def trainer_entrypoint_fn(a, strategy: Strategy):
return 123


LAUNCHER = 'torch-elastic-no'
STRATEGY = 'ddp'

RUN_ID = "my_run_id"
Expand All @@ -74,36 +77,40 @@ def trainer_entrypoint_fn(a, strategy: Strategy):
MAX_RESTARTS = 2

if __name__ == "__main__":
# STRATEGY BUILDER

# Instantiate ClusterEnv
if SLURMEnvironment.detect():
cluster = SLURMEnvironment()
elif TorchElasticEnvironment.detect():
cluster = TorchElasticEnvironment()
elif LocalEnvironment.detect():
cluster = LocalEnvironment()
else:
raise NotImplementedError("Unrecognized cluster env")

print(cluster)

# Instantiate Launcher
# launcher = DummyTorchElasticLauncher(
# cluster=cluster,
# n_workers_per_node=NPROC_PRE_NODE,
# min_nodes=MIN_NODES,
# max_nodes=MAX_NODES
# # STRATEGY BUILDER

# # Instantiate Launcher Factory
# # launcher = DummyTorchElasticLauncher(
# # n_workers_per_node=NPROC_PRE_NODE,
# # min_nodes=MIN_NODES,
# # max_nodes=MAX_NODES
# # )
# # launcher = TorchElasticLauncher(
# # rdzv_id=RUN_ID,
# # nproc_per_node=NPROC_PRE_NODE,
# # nnodes=f"{MIN_NODES}:{MAX_NODES}",
# # max_restarts=MAX_RESTARTS
# # )
# if LAUNCHER == 'torch-elastic':
# launcher_builder: LauncherFactory = TorchElasticLauncherFactory()
# else:
# launcher_builder: LauncherFactory = SimpleLauncherFactory()

# # Instantiate launcher
# launcher = launcher_builder.createLauncher(
# n_workers_per_node=NPROC_PRE_NODE
# )
launcher = TorchElasticLauncher(nproc_per_node=NPROC_PRE_NODE)

# Instantiate Strategy
if (STRATEGY == 'ddp'
and torch.cuda.is_available()
and torch.cuda.device_count() > 1):
strategy = DDPStrategy(cluster=cluster, backend='nccl')
else:
raise NotImplementedError

# # Instantiate Strategy
# if (STRATEGY == 'ddp'
# and torch.cuda.is_available()
# and torch.cuda.device_count() > 1):
# strategy = DDPStrategy(cluster=None, backend='nccl')
# else:
# raise NotImplementedError

dist_tools = DistributedTooling(n_workers_per_node=NPROC_PRE_NODE)
launcher, strategy = dist_tools.getTools('ddp')

# CLIENT CODE
# Launch training from launcher
Expand Down
68 changes: 68 additions & 0 deletions experimental/distributed_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from typing import Tuple
import abc

from launcher import Launcher
from strategy import Strategy, DDPStrategy
from launcher_factory import TorchElasticLauncherFactory


class Assembler(abc.ABC):
"""Abstract Assembler class."""


class DistributedTooling(Assembler):
"""
Assembles a set of objects used to enable distributed ML.
Suggests working presets of Launcher and Strategy, providing
an easy entry point for the end user.
"""

def __init__(self, n_workers_per_node: int = 1) -> None:
super().__init__()
self.n_workers_per_node = n_workers_per_node

def getTools(self, strategy: str) -> Tuple[Launcher, Strategy]:
if strategy == 'ddp':
return self.getTorchDDPTools()
if strategy == 'deepspeed':
return self.getDeepSpeedTools()
if strategy == 'horovod':
return self.getHorovodTools()
raise ValueError(f"Unrecognized strategy={strategy}")

def getTorchDDPTools(self) -> Tuple[Launcher, Strategy]:
"""
Returns a suggested preset of Launcher + Strategy
for torch distributed data parallel.
"""
import torch
if not torch.cuda.is_available():
raise RuntimeError(
"Torch DDP cannot be used. GPUs not available."
)
if not torch.cuda.device_count() > 1:
raise RuntimeError(
"Torch DDP cannot be used. Only one GPU is available."
)
launcher_builder = TorchElasticLauncherFactory()
elastic_launcher = launcher_builder.createLauncher(
n_workers_per_node=self.n_workers_per_node
)
strategy = DDPStrategy(backend='nccl')
return elastic_launcher, strategy

def getDeepSpeedTools(self) -> Tuple[Launcher, Strategy]:
"""
Returns a suggested preset of Launcher + Strategy
for DeepSpeed distributed ML.
"""
# TODO: complete
raise NotImplementedError

def getHorovodTools(self) -> Tuple[Launcher, Strategy]:
"""
Returns a suggested preset of Launcher + Strategy
for Horovod distributed ML.
"""
# TODO: complete
raise NotImplementedError
85 changes: 85 additions & 0 deletions experimental/example_0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""
Run this with torchrun
"""

import os

import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset

from strategy import Strategy, DDPStrategy, HorovodStrategy


class UniformRndDataset(Dataset):
def __init__(self, x_size: int, y_size: int, len: int = 100):
super().__init__()
self.x_size = x_size
self.y_size = y_size
self.len = len

def __len__(self):
return self.len

def __getitem__(self, index):
return torch.rand(self.x_size), torch.rand(self.y_size)


def trainer_entrypoint_fn(a, strategy: Strategy):
"""Dummy training function."""
strategy.setup()
print(f"{a}: {os.environ.get('RANK')} {os.environ.get('LOCAL_RANK')} "
f"{os.environ.get('MASTER_ADDR')} {os.environ.get('MASTER_PORT')}")

# Local model
model = nn.Linear(3, 4)
optim = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()
# Distributed model
model: nn.Module = strategy.distribute_model(model)
optim: torch.optim.Optimizer = strategy.distribute_optimizer(optim)

# Data
train_set = UniformRndDataset(x_size=3, y_size=4)
train_loader = DataLoader(train_set, batch_size=10, num_workers=1)
# Distributed dataloader
train_loader: DataLoader = strategy.distribute_dataloader(train_loader)

for epoch in range(2):
for (x, y) in train_loader:
# print(f"tensor to cuda:{strategy.device}")
x = x.to(strategy.device)
y = y.to(strategy.device)

optim.zero_grad()
y_pred = model(x)
loss = loss_fn(y_pred, y)
loss.backward()
optim.step()

if strategy.is_main_worker():
print(f"Loss [epoch={epoch}]: {loss.item()}")

strategy.teardown()
return 123


STRATEGY = 'ddp'


if __name__ == "__main__":

# Instantiate Strategy
if STRATEGY == 'ddp':
if (not torch.cuda.is_available()
or not torch.cuda.device_count() > 1):
raise RuntimeError('Resources unavailable')

strategy = DDPStrategy(cluster=None, backend='nccl')
elif STRATEGY == 'horovod':
strategy = HorovodStrategy()
else:
raise NotImplementedError

# Launch distributed training
trainer_entrypoint_fn("foobar", strategy)
Loading

0 comments on commit b0589c6

Please sign in to comment.