diff --git a/experimental/cluster.py b/experimental/cluster.py index 981c8de2..78ae8ead 100644 --- a/experimental/cluster.py +++ b/experimental/cluster.py @@ -1,5 +1,6 @@ import abc import os +import time from lightning.pytorch.plugins.environments import ( ClusterEnvironment as LightningClusterEnvironment, @@ -14,13 +15,23 @@ class ClusterEnvironment(LightningClusterEnvironment): def num_nodes(self) -> int: """Returns the number of nodes allocated for the current job.""" + @abc.abstractmethod + def job_id(self) -> str: + """Returns the current job ID inferred from the cluster.""" + class SLURMEnvironment(LightningSLURMEnvironment): def num_nodes(self) -> int: """Returns the number of nodes allocated for the current job.""" if os.environ.get('SLURM_JOB_NUM_NODES'): return int(os.environ['SLURM_JOB_NUM_NODES']) - return int(os.environ['SLURM_NNODES']) + if os.environ.get('SLURM_NNODES'): + return int(os.environ['SLURM_NNODES']) + raise RuntimeError('Number of nodes not found in SLURM env variables') + + def job_id(self) -> str: + """Returns the current job ID inferred from the cluster.""" + return os.environ['SLURM_JOB_ID'] class TorchElasticEnvironment(LightningTorchElasticEnvironment): @@ -30,19 +41,57 @@ def num_nodes(self) -> int: lwsize = int(os.environ['LOCAL_WORLD_SIZE']) return gwsize//lwsize + def job_id(self) -> str: + """Returns the current job ID inferred from the cluster.""" + return os.environ['TORCHELASTIC_RUN_ID'] + class LocalEnvironment(LightningEnvironment): + _job_id: str = None + def world_size(self) -> int: - if os.environ.get('WORLD_SIZE'): - return int(os.environ.get('WORLD_SIZE')) + # if os.environ.get('WORLD_SIZE'): + # return int(os.environ.get('WORLD_SIZE')) + print( + "WARNING: world_size() method in 'LocalEnvironment' returns " + f"a fixed-value placeholder world_size={self._world_size}. " + "Use it carefully!" + ) return self._world_size def global_rank(self) -> int: - if os.environ.get('RANK'): - return int(os.environ.get('RANK')) + # if os.environ.get('RANK'): + # return int(os.environ.get('RANK')) + print( + "WARNING: global_rank() method in 'LocalEnvironment' returns " + f"a fixed-value placeholder global_rank={self._global_rank}. " + "Use it carefully!" + ) return self._global_rank def num_nodes(self) -> int: """Returns the number of nodes allocated for the current job.""" return 1 + + def job_id(self) -> str: + """Returns the current job ID inferred from the cluster.""" + if self._job_id is None: + self._job_id = str(time.time()) + return self._job_id + + +def detect_cluster() -> ClusterEnvironment: + """Defines a protocol to select the ClusterEnvironment + depending on availability and priority. + """ + + if SLURMEnvironment.detect(): + cluster = SLURMEnvironment() + elif TorchElasticEnvironment.detect(): + cluster = TorchElasticEnvironment() + elif LocalEnvironment.detect(): + cluster = LocalEnvironment() + else: + raise NotImplementedError("Unrecognized cluster env") + return cluster diff --git a/experimental/distrib_launcher.py b/experimental/distrib_launcher.py index 16f3a249..d8f4e881 100644 --- a/experimental/distrib_launcher.py +++ b/experimental/distrib_launcher.py @@ -6,10 +6,12 @@ from strategy import Strategy, DDPStrategy from launcher import DummyTorchElasticLauncher, TorchElasticLauncher -from cluster import ( - LocalEnvironment, SLURMEnvironment, - TorchElasticEnvironment +from launcher_factory import ( + LauncherFactory, + SimpleLauncherFactory, + TorchElasticLauncherFactory ) +from distributed_tools import DistributedTooling class UniformRndDataset(Dataset): @@ -65,6 +67,7 @@ def trainer_entrypoint_fn(a, strategy: Strategy): return 123 +LAUNCHER = 'torch-elastic-no' STRATEGY = 'ddp' RUN_ID = "my_run_id" @@ -74,36 +77,40 @@ def trainer_entrypoint_fn(a, strategy: Strategy): MAX_RESTARTS = 2 if __name__ == "__main__": - # STRATEGY BUILDER - - # Instantiate ClusterEnv - if SLURMEnvironment.detect(): - cluster = SLURMEnvironment() - elif TorchElasticEnvironment.detect(): - cluster = TorchElasticEnvironment() - elif LocalEnvironment.detect(): - cluster = LocalEnvironment() - else: - raise NotImplementedError("Unrecognized cluster env") - - print(cluster) - - # Instantiate Launcher - # launcher = DummyTorchElasticLauncher( - # cluster=cluster, - # n_workers_per_node=NPROC_PRE_NODE, - # min_nodes=MIN_NODES, - # max_nodes=MAX_NODES + # # STRATEGY BUILDER + + # # Instantiate Launcher Factory + # # launcher = DummyTorchElasticLauncher( + # # n_workers_per_node=NPROC_PRE_NODE, + # # min_nodes=MIN_NODES, + # # max_nodes=MAX_NODES + # # ) + # # launcher = TorchElasticLauncher( + # # rdzv_id=RUN_ID, + # # nproc_per_node=NPROC_PRE_NODE, + # # nnodes=f"{MIN_NODES}:{MAX_NODES}", + # # max_restarts=MAX_RESTARTS + # # ) + # if LAUNCHER == 'torch-elastic': + # launcher_builder: LauncherFactory = TorchElasticLauncherFactory() + # else: + # launcher_builder: LauncherFactory = SimpleLauncherFactory() + + # # Instantiate launcher + # launcher = launcher_builder.createLauncher( + # n_workers_per_node=NPROC_PRE_NODE # ) - launcher = TorchElasticLauncher(nproc_per_node=NPROC_PRE_NODE) - - # Instantiate Strategy - if (STRATEGY == 'ddp' - and torch.cuda.is_available() - and torch.cuda.device_count() > 1): - strategy = DDPStrategy(cluster=cluster, backend='nccl') - else: - raise NotImplementedError + + # # Instantiate Strategy + # if (STRATEGY == 'ddp' + # and torch.cuda.is_available() + # and torch.cuda.device_count() > 1): + # strategy = DDPStrategy(cluster=None, backend='nccl') + # else: + # raise NotImplementedError + + dist_tools = DistributedTooling(n_workers_per_node=NPROC_PRE_NODE) + launcher, strategy = dist_tools.getTools('ddp') # CLIENT CODE # Launch training from launcher diff --git a/experimental/distributed_tools.py b/experimental/distributed_tools.py new file mode 100644 index 00000000..83bf241f --- /dev/null +++ b/experimental/distributed_tools.py @@ -0,0 +1,68 @@ +from typing import Tuple +import abc + +from launcher import Launcher +from strategy import Strategy, DDPStrategy +from launcher_factory import TorchElasticLauncherFactory + + +class Assembler(abc.ABC): + """Abstract Assembler class.""" + + +class DistributedTooling(Assembler): + """ + Assembles a set of objects used to enable distributed ML. + Suggests working presets of Launcher and Strategy, providing + an easy entry point for the end user. + """ + + def __init__(self, n_workers_per_node: int = 1) -> None: + super().__init__() + self.n_workers_per_node = n_workers_per_node + + def getTools(self, strategy: str) -> Tuple[Launcher, Strategy]: + if strategy == 'ddp': + return self.getTorchDDPTools() + if strategy == 'deepspeed': + return self.getDeepSpeedTools() + if strategy == 'horovod': + return self.getHorovodTools() + raise ValueError(f"Unrecognized strategy={strategy}") + + def getTorchDDPTools(self) -> Tuple[Launcher, Strategy]: + """ + Returns a suggested preset of Launcher + Strategy + for torch distributed data parallel. + """ + import torch + if not torch.cuda.is_available(): + raise RuntimeError( + "Torch DDP cannot be used. GPUs not available." + ) + if not torch.cuda.device_count() > 1: + raise RuntimeError( + "Torch DDP cannot be used. Only one GPU is available." + ) + launcher_builder = TorchElasticLauncherFactory() + elastic_launcher = launcher_builder.createLauncher( + n_workers_per_node=self.n_workers_per_node + ) + strategy = DDPStrategy(backend='nccl') + return elastic_launcher, strategy + + def getDeepSpeedTools(self) -> Tuple[Launcher, Strategy]: + """ + Returns a suggested preset of Launcher + Strategy + for DeepSpeed distributed ML. + """ + # TODO: complete + raise NotImplementedError + + def getHorovodTools(self) -> Tuple[Launcher, Strategy]: + """ + Returns a suggested preset of Launcher + Strategy + for Horovod distributed ML. + """ + # TODO: complete + raise NotImplementedError diff --git a/experimental/example_0.py b/experimental/example_0.py new file mode 100644 index 00000000..d18a40db --- /dev/null +++ b/experimental/example_0.py @@ -0,0 +1,85 @@ +""" +Run this with torchrun +""" + +import os + +import torch +from torch import nn +from torch.utils.data import DataLoader, Dataset + +from strategy import Strategy, DDPStrategy, HorovodStrategy + + +class UniformRndDataset(Dataset): + def __init__(self, x_size: int, y_size: int, len: int = 100): + super().__init__() + self.x_size = x_size + self.y_size = y_size + self.len = len + + def __len__(self): + return self.len + + def __getitem__(self, index): + return torch.rand(self.x_size), torch.rand(self.y_size) + + +def trainer_entrypoint_fn(a, strategy: Strategy): + """Dummy training function.""" + strategy.setup() + print(f"{a}: {os.environ.get('RANK')} {os.environ.get('LOCAL_RANK')} " + f"{os.environ.get('MASTER_ADDR')} {os.environ.get('MASTER_PORT')}") + + # Local model + model = nn.Linear(3, 4) + optim = torch.optim.Adam(model.parameters(), lr=1e-3) + loss_fn = nn.MSELoss() + # Distributed model + model: nn.Module = strategy.distribute_model(model) + optim: torch.optim.Optimizer = strategy.distribute_optimizer(optim) + + # Data + train_set = UniformRndDataset(x_size=3, y_size=4) + train_loader = DataLoader(train_set, batch_size=10, num_workers=1) + # Distributed dataloader + train_loader: DataLoader = strategy.distribute_dataloader(train_loader) + + for epoch in range(2): + for (x, y) in train_loader: + # print(f"tensor to cuda:{strategy.device}") + x = x.to(strategy.device) + y = y.to(strategy.device) + + optim.zero_grad() + y_pred = model(x) + loss = loss_fn(y_pred, y) + loss.backward() + optim.step() + + if strategy.is_main_worker(): + print(f"Loss [epoch={epoch}]: {loss.item()}") + + strategy.teardown() + return 123 + + +STRATEGY = 'ddp' + + +if __name__ == "__main__": + + # Instantiate Strategy + if STRATEGY == 'ddp': + if (not torch.cuda.is_available() + or not torch.cuda.device_count() > 1): + raise RuntimeError('Resources unavailable') + + strategy = DDPStrategy(cluster=None, backend='nccl') + elif STRATEGY == 'horovod': + strategy = HorovodStrategy() + else: + raise NotImplementedError + + # Launch distributed training + trainer_entrypoint_fn("foobar", strategy) diff --git a/experimental/example_1.py b/experimental/example_1.py new file mode 100644 index 00000000..3cc2e452 --- /dev/null +++ b/experimental/example_1.py @@ -0,0 +1,106 @@ +""" +Introduction of launcher. Torchrun is not needed anymore. +""" +import os + +import torch +from torch import nn +from torch.utils.data import DataLoader, Dataset + +from strategy import Strategy, DDPStrategy, HorovodStrategy +from launcher import TorchElasticLauncher, SimpleLauncher + + +class UniformRndDataset(Dataset): + def __init__(self, x_size: int, y_size: int, len: int = 100): + super().__init__() + self.x_size = x_size + self.y_size = y_size + self.len = len + + def __len__(self): + return self.len + + def __getitem__(self, index): + return torch.rand(self.x_size), torch.rand(self.y_size) + + +def trainer_entrypoint_fn(a, strategy: Strategy): + """Dummy training function.""" + strategy.setup() + print(f"{a}: {os.environ.get('RANK')} {os.environ.get('LOCAL_RANK')} " + f"{os.environ.get('MASTER_ADDR')} {os.environ.get('MASTER_PORT')}") + + # Local model + model = nn.Linear(3, 4) + optim = torch.optim.Adam(model.parameters(), lr=1e-3) + loss_fn = nn.MSELoss() + # Distributed model + model: nn.Module = strategy.distribute_model(model) + optim: torch.optim.Optimizer = strategy.distribute_optimizer(optim) + + # Data + train_set = UniformRndDataset(x_size=3, y_size=4) + train_loader = DataLoader(train_set, batch_size=10, num_workers=1) + # Distributed dataloader + train_loader: DataLoader = strategy.distribute_dataloader(train_loader) + + for epoch in range(2): + for (x, y) in train_loader: + # print(f"tensor to cuda:{strategy.device}") + x = x.to(strategy.device) + y = y.to(strategy.device) + + optim.zero_grad() + y_pred = model(x) + loss = loss_fn(y_pred, y) + loss.backward() + optim.step() + + if strategy.is_main_worker(): + print(f"Loss [epoch={epoch}]: {loss.item()}") + + strategy.teardown() + return 123 + + +LAUNCHER = 'torch-elastic' +STRATEGY = 'ddp' +RUN_ID = "my_run_id" +MIN_NODES = 1 +MAX_NODES = 1 +NPROC_PRE_NODE = 4 +MAX_RESTARTS = 2 + +if __name__ == "__main__": + + # Instantiate Launcher Factory + if LAUNCHER == 'torch-elastic': + launcher = TorchElasticLauncher( + rdzv_id=RUN_ID, + nproc_per_node=NPROC_PRE_NODE, + nnodes=f"{MIN_NODES}:{MAX_NODES}", + max_restarts=MAX_RESTARTS + ) + elif LAUNCHER == 'simple-launcher': + launcher = SimpleLauncher( + nproc_per_node=NPROC_PRE_NODE + ) + else: + raise NotImplementedError + + # Instantiate Strategy + if STRATEGY == 'ddp': + if (not torch.cuda.is_available() + or not torch.cuda.device_count() > 1): + raise RuntimeError('Resources unavailable') + + strategy = DDPStrategy(cluster=None, backend='nccl') + elif STRATEGY == 'horovod': + strategy = HorovodStrategy() + else: + raise NotImplementedError + + # CLIENT CODE + # Launch training from launcher + launcher.run(func=trainer_entrypoint_fn, args=("foobar", strategy)) diff --git a/experimental/example_2.py b/experimental/example_2.py new file mode 100644 index 00000000..14685753 --- /dev/null +++ b/experimental/example_2.py @@ -0,0 +1,107 @@ +""" +Unified interface for launchers. +Most of the complexity is hidden inside "factory" classes. +""" + +import os + +import torch +from torch import nn +from torch.utils.data import DataLoader, Dataset + +from strategy import Strategy, DDPStrategy, HorovodStrategy +from launcher_factory import ( + LauncherFactory, + SimpleLauncherFactory, + TorchElasticLauncherFactory +) + + +class UniformRndDataset(Dataset): + def __init__(self, x_size: int, y_size: int, len: int = 100): + super().__init__() + self.x_size = x_size + self.y_size = y_size + self.len = len + + def __len__(self): + return self.len + + def __getitem__(self, index): + return torch.rand(self.x_size), torch.rand(self.y_size) + + +def trainer_entrypoint_fn(a, strategy: Strategy): + """Dummy training function.""" + strategy.setup() + print(f"{a}: {os.environ.get('RANK')} {os.environ.get('LOCAL_RANK')} " + f"{os.environ.get('MASTER_ADDR')} {os.environ.get('MASTER_PORT')}") + + # Local model + model = nn.Linear(3, 4) + optim = torch.optim.Adam(model.parameters(), lr=1e-3) + loss_fn = nn.MSELoss() + # Distributed model + model: nn.Module = strategy.distribute_model(model) + optim: torch.optim.Optimizer = strategy.distribute_optimizer(optim) + + # Data + train_set = UniformRndDataset(x_size=3, y_size=4) + train_loader = DataLoader(train_set, batch_size=10, num_workers=1) + # Distributed dataloader + train_loader: DataLoader = strategy.distribute_dataloader(train_loader) + + for epoch in range(2): + for (x, y) in train_loader: + # print(f"tensor to cuda:{strategy.device}") + x = x.to(strategy.device) + y = y.to(strategy.device) + + optim.zero_grad() + y_pred = model(x) + loss = loss_fn(y_pred, y) + loss.backward() + optim.step() + + if strategy.is_main_worker(): + print(f"Loss [epoch={epoch}]: {loss.item()}") + + strategy.teardown() + return 123 + + +LAUNCHER = 'torch-elastic' +STRATEGY = 'ddp' +NPROC_PRE_NODE = 4 + +if __name__ == "__main__": + # STRATEGY BUILDER + + # Instantiate Launcher Factory + if LAUNCHER == 'torch-elastic': + launcher_builder: LauncherFactory = TorchElasticLauncherFactory() + elif LAUNCHER == 'simple-launcher': + launcher_builder: LauncherFactory = SimpleLauncherFactory() + else: + raise NotImplementedError + + # Instantiate launcher + launcher = launcher_builder.createLauncher( + n_workers_per_node=NPROC_PRE_NODE + ) + + # Instantiate Strategy + if STRATEGY == 'ddp': + if (not torch.cuda.is_available() + or not torch.cuda.device_count() > 1): + raise RuntimeError('Resources unavailable') + + strategy = DDPStrategy(cluster=None, backend='nccl') + elif STRATEGY == 'horovod': + strategy = HorovodStrategy() + else: + raise NotImplementedError + + # CLIENT CODE + # Launch training from launcher + launcher.run(func=trainer_entrypoint_fn, args=("foobar", strategy)) diff --git a/experimental/example_3.py b/experimental/example_3.py new file mode 100644 index 00000000..d38dd78c --- /dev/null +++ b/experimental/example_3.py @@ -0,0 +1,77 @@ +""" +Hide the selection of launcher and strategy inside a class. +""" +import os + +import torch +from torch import nn +from torch.utils.data import DataLoader, Dataset + +from strategy import Strategy +from distributed_tools import DistributedTooling + + +class UniformRndDataset(Dataset): + def __init__(self, x_size: int, y_size: int, len: int = 100): + super().__init__() + self.x_size = x_size + self.y_size = y_size + self.len = len + + def __len__(self): + return self.len + + def __getitem__(self, index): + return torch.rand(self.x_size), torch.rand(self.y_size) + + +def trainer_entrypoint_fn(a, strategy: Strategy): + """Dummy training function.""" + strategy.setup() + print(f"{a}: {os.environ.get('RANK')} {os.environ.get('LOCAL_RANK')} " + f"{os.environ.get('MASTER_ADDR')} {os.environ.get('MASTER_PORT')}") + + # Local model + model = nn.Linear(3, 4) + optim = torch.optim.Adam(model.parameters(), lr=1e-3) + loss_fn = nn.MSELoss() + # Distributed model + model: nn.Module = strategy.distribute_model(model) + optim: torch.optim.Optimizer = strategy.distribute_optimizer(optim) + + # Data + train_set = UniformRndDataset(x_size=3, y_size=4) + train_loader = DataLoader(train_set, batch_size=10, num_workers=1) + # Distributed dataloader + train_loader: DataLoader = strategy.distribute_dataloader(train_loader) + + for epoch in range(2): + for (x, y) in train_loader: + # print(f"tensor to cuda:{strategy.device}") + x = x.to(strategy.device) + y = y.to(strategy.device) + + optim.zero_grad() + y_pred = model(x) + loss = loss_fn(y_pred, y) + loss.backward() + optim.step() + + if strategy.is_main_worker(): + print(f"Loss [epoch={epoch}]: {loss.item()}") + + strategy.teardown() + return 123 + + +STRATEGY = 'ddp' +NPROC_PRE_NODE = 4 + + +if __name__ == "__main__": + dist_tools = DistributedTooling(n_workers_per_node=NPROC_PRE_NODE) + launcher, strategy = dist_tools.getTools('ddp') + + # CLIENT CODE + # Launch training from launcher + launcher.run(func=trainer_entrypoint_fn, args=("foobar", strategy)) diff --git a/experimental/launcher.py b/experimental/launcher.py index 4670d27e..d9733b8f 100644 --- a/experimental/launcher.py +++ b/experimental/launcher.py @@ -1,4 +1,6 @@ import datetime +import os +import shutil import abc import time import uuid @@ -15,24 +17,19 @@ C10dRendezvousBackend ) from torch.distributed import TCPStore -from torch.distributed.elastic.multiprocessing import Std +from torch.distributed.elastic.multiprocessing import Std, start_processes from torch.distributed.launcher.api import LaunchConfig, elastic_launch from torch.distributed.run import config_from_args -# from lightning.pytorch.plugins.environments import ( -# ClusterEnvironment, SLURMEnvironment, -# TorchElasticEnvironment, LightningEnvironment -# ) -# from torch.distributed.argparse_util import check_env, env -from cluster import ClusterEnvironment +from cluster import ClusterEnvironment, detect_cluster class Launcher(abc.ABC): cluster: ClusterEnvironment @abc.abstractmethod - def run(*args): + def run(self, *args) -> Any: """Launches the distributed execution.""" @@ -41,14 +38,16 @@ class DummyTorchElasticLauncher(Launcher): def __init__( self, - cluster: ClusterEnvironment, + cluster: Optional[ClusterEnvironment] = None, n_workers_per_node: int = 1, min_nodes: int = 1, max_nodes: int = 1, max_restarts: int = 1 ) -> None: super().__init__() - self.cluster = cluster + # detect_cluster() is preferred + self.cluster = cluster if cluster is not None else detect_cluster() + print(f"DummyTorchElasticLauncher with cluster '{self.cluster}'") self.n_workers_per_node = n_workers_per_node self.min_nodes = min_nodes self.max_nodes = max_nodes @@ -85,7 +84,7 @@ def run( redirect: bool = False, log_dir: str = 'launcher_logs', tee_ranks: Union[str, int, List[int]] = None - ) -> Any: + ) -> List[Any]: """Launches the distributed execution with Torch Elastic.""" # Suppress all printing to console: # redirects={0: Std.ALL} # do no print, but save to file. @@ -203,7 +202,7 @@ def run( self, func: Callable, args: Tuple = () - ): + ) -> Any: if self.standalone: self.rdzv_backend = "c10d" self.rdzv_endpoint = "localhost:29400" @@ -225,8 +224,72 @@ def run( class SimpleLauncher(Launcher): - """Simple launcher based on multiprocessing.""" + """Simple launcher based on multiprocessing. + Use ONLY for single node applications. + """ + + def __init__( + self, + nproc_per_node: int, + run_id: Optional[str] = None, + master_addr: str = "127.0.0.1", + master_port: int = 29500 + ) -> None: + super().__init__() + self.nproc_per_node = nproc_per_node + self.run_id = run_id if run_id is not None else f"RunID:{time.time()}" + self.master_addr = master_addr + self.master_port = master_port + self.log_dir = f'{self.__class__.__name__}_logs' + if os.path.exists(self.log_dir): + shutil.rmtree(self.log_dir) + os.makedirs(self.log_dir) + + def run( + self, + func: Callable, + args: Tuple = () + ) -> Any: + # Adapted from: + # https://pytorch.org/docs/stable/elastic/multiprocessing.html + w_args = {i: args for i in range(self.nproc_per_node)} + # Emulates the env variables set by torch Elastic + w_envs = { + i: dict( + RANK=str(i), + LOCAL_RANK=str(i), + GROUP_RANK=str(0), + ROLE_RANK=str(i), + WORLD_SIZE=str(self.nproc_per_node), + LOCAL_WORLD_SIZE=str(self.nproc_per_node), + ROLE_WORLD_SIZE=str(self.nproc_per_node), + TORCHELASTIC_RUN_ID=str(self.run_id), + MASTER_ADDR=str(self.master_addr), + MASTER_PORT=str(self.master_port) + ) + for i in range(self.nproc_per_node) + } + ctx = start_processes( + name=self.__class__.__name__, + entrypoint=func, + args=w_args, + envs=w_envs, + log_dir=self.log_dir + ) + ctx.wait() + return ctx.return_values class DeepSpeedLauncher(Launcher): """Official DeepSpeed launcher.""" + + def __init__(self) -> None: + super().__init__() + + def run( + self, + func: Callable, + args: Tuple = () + ) -> Any: + # TODO: complete + raise NotImplementedError diff --git a/experimental/launcher_factory.py b/experimental/launcher_factory.py new file mode 100644 index 00000000..fce12a0c --- /dev/null +++ b/experimental/launcher_factory.py @@ -0,0 +1,144 @@ +""" +Factories to instantiate Launcher classes. +They introduce a level of indirection to provide a unified interface +for all the launchers. The common interface is provided by the +`createLauncher` factory method. +""" + +from typing import Optional, Dict, Any +import abc + +from launcher import ( + Launcher, + TorchElasticLauncher, + SimpleLauncher, + DeepSpeedLauncher +) +from cluster import detect_cluster + + +class LauncherFactory(abc.ABC): + """ + Factory class to instantiate a Launcher classes. + It introduces a level of indirection to provide a unified interface + for all the launchers. The common interface is provided by the + `createLauncher` factory method. + """ + + def createLauncher( + self, + n_workers_per_node: int, + run_id: Optional[str] = None, + master_addr: Optional[str] = None, + master_port: Optional[int] = None, + **kwargs + ) -> Launcher: + """ + Simplifies the instantiation of a Launcher. + Advanced configuration is pre-computed in the body + of this method, leaving few parameters to the end user. + """ + + +class TorchElasticLauncherFactory(LauncherFactory): + """Factory class to instantiate a TorchElasticLauncher class.""" + + def createLauncher( + self, + n_workers_per_node: int, + run_id: Optional[str] = None, + master_addr: Optional[str] = None, + master_port: Optional[int] = None, + **kwargs + ) -> Launcher: + """ + Simplifies the instantiation of a TorchElasticLauncher. + Advanced configuration is pre-computed in the body + of this method, leaving few parameters to the end user. + """ + cluster = detect_cluster() + + kwargs['nproc_per_node'] = n_workers_per_node + # If given, propagate the args + if run_id: + kwargs['rdzv_id'] = run_id + if master_addr: + kwargs['master_addr'] = master_addr + if master_port: + kwargs['master_port'] = master_port + + # Compute and add TorchElastic specific args, if not + # provided as **kwargs + n_nodes = cluster.num_nodes() + safe_add(kwargs, 'nnodes', f"{n_nodes}:{n_nodes}") + safe_add(kwargs, 'rdzv_id', cluster.job_id()) + is_host_flag = '1' if cluster.node_rank() == 0 else '0' + safe_add(kwargs, 'rdzv_conf', f'is_host={is_host_flag}') + safe_add(kwargs, 'rdzv_backend', 'c10d') + safe_add( + kwargs, + 'rdzv_endpoint', + f'{cluster.main_address}:{cluster.main_port}' + ) + safe_add(kwargs, 'max_restarts', 3) + + return TorchElasticLauncher(**kwargs) + + +class SimpleLauncherFactory(LauncherFactory): + """Factory class to instantiate a SimpleLauncherFactory class.""" + + def createLauncher( + self, + n_workers_per_node: int, + run_id: Optional[str] = None, + master_addr: Optional[str] = None, + master_port: Optional[int] = None, + **kwargs + ) -> Launcher: + """ + Simplifies the instantiation of a SimpleLauncher. + Advanced configuration is pre-computed in the body + of this method, leaving few parameters to the end user. + """ + + kwargs['nproc_per_node'] = n_workers_per_node + # If given, propagate the args + if run_id: + kwargs['run_id'] = run_id + if master_addr: + kwargs['master_addr'] = master_addr + if master_port: + kwargs['master_port'] = master_port + + return SimpleLauncher(**kwargs) + + +class DeepSpeedLauncherFactory(LauncherFactory): + """Factory class to instantiate a DeepSpeedLauncher class.""" + + def createLauncher( + self, + n_workers_per_node: int, + run_id: Optional[str] = None, + master_addr: Optional[str] = None, + master_port: Optional[int] = None, + **kwargs + ) -> Launcher: + """ + Simplifies the instantiation of a DeepSpeedLauncher. + Advanced configuration is pre-computed in the body + of this method, leaving few parameters to the end user. + """ + # TODO: complete + raise NotImplementedError + return DeepSpeedLauncher(...) + + +def safe_add(map: Dict, key: str, value: Any) -> None: + """ + Add a key-value pair to a dict if the key + is not already present. + """ + if map.get(key) is None: + map[key] = value diff --git a/experimental/strategy.py b/experimental/strategy.py index 60923400..8fad3429 100644 --- a/experimental/strategy.py +++ b/experimental/strategy.py @@ -1,6 +1,6 @@ import os import abc -from typing import Any +from typing import Any, Optional import torch from torch import nn @@ -10,7 +10,7 @@ from torch.distributed import init_process_group # from lightning.pytorch.plugins.environments import ClusterEnvironment -from cluster import ClusterEnvironment +from cluster import ClusterEnvironment, detect_cluster class Strategy(abc.ABC): @@ -53,8 +53,8 @@ def distribute_dataloader(self, dataloader: Any) -> Any: class DDPStrategy(Strategy): def __init__( self, - cluster: ClusterEnvironment, - backend: str = 'nccl' + backend: str = 'nccl', + cluster: Optional[ClusterEnvironment] = None ) -> None: super().__init__() self.cluster = cluster @@ -71,6 +71,11 @@ def setup(self) -> None: raise RuntimeError( "Distributed environment not setup correctly. Use a launcher.") + # detect_cluster() is preferred + if self.cluster is None: + self.cluster = detect_cluster() + print(f"DDPStrategy executed on '{self.cluster}' cluster") + # Initializes the default distributed process group # and the distributed package init_process_group(backend=self.backend)