From a140dd0c0a8ac20a76c5046755928160a79a9b55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jarl=20Sondre=20S=C3=A6ther?= <60541573+jarlsondre@users.noreply.github.com> Date: Mon, 16 Dec 2024 17:51:23 +0100 Subject: [PATCH] more clean up [WIP] --- .../torch-scaling-test/ddp_trainer.py | 48 +++--- .../torch-scaling-test/deepspeed_trainer.py | 139 +----------------- .../torch-scaling-test/horovod_trainer.py | 17 +-- .../torch-scaling-test/utils.py | 15 +- 4 files changed, 35 insertions(+), 184 deletions(-) diff --git a/tutorials/distributed-ml/torch-scaling-test/ddp_trainer.py b/tutorials/distributed-ml/torch-scaling-test/ddp_trainer.py index 00e0f6f1..6f1ed73a 100755 --- a/tutorials/distributed-ml/torch-scaling-test/ddp_trainer.py +++ b/tutorials/distributed-ml/torch-scaling-test/ddp_trainer.py @@ -21,46 +21,45 @@ import torchvision from torch.utils.data import DataLoader from torch.utils.data.distributed import DistributedSampler -from utils import imagenet_dataset, parse_params, train_epoch +from utils import imagenet_dataset, get_parser, train_epoch from itwinai.loggers import EpochTimeTracker from itwinai.torch.reproducibility import seed_worker, set_seed def main(): - args = parse_params() + parser = get_parser() + args = parser.parse_args() + subset_size = 5000 use_cuda = not args.no_cuda and torch.cuda.is_available() is_distributed = use_cuda and torch.cuda.device_count() > 0 + torch_seed = set_seed(args.rnd_seed, deterministic_cudnn=False) - if is_distributed: + train_dataset = imagenet_dataset(args.data_dir, subset_size=subset_size) + if not is_distributed: + local_world_size = 1 + global_rank = 0 + local_rank = 0 + + train_loader = DataLoader( + train_dataset, + batch_size=args.batch_size, + generator=torch_seed, + worker_init_fn=seed_worker, + ) + else: # Initializing the distribution backend dist.init_process_group(backend=args.backend) local_world_size = torch.cuda.device_count() global_rank = dist.get_rank() local_rank = dist.get_rank() % local_world_size - else: - local_world_size = 1 - global_rank = 0 - local_rank = 0 - - # Set random seed for reproducibility - torch_seed = set_seed(args.rnd_seed, deterministic_cudnn=False) - device = torch.device("cuda" if use_cuda else "cpu", local_rank) - if use_cuda: - torch.cuda.set_device(local_rank) - - # Dataset - train_dataset = imagenet_dataset(args.data_dir, subset_size=subset_size) - if is_distributed: - # Distributed training requires a distributed sampler to split the data - # between the workers + # Creating dataset and dataloader shuffle: bool = args.shuff and args.rnd_seed is None pin_memory = True persistent_workers = args.nworker > 1 - train_sampler = DistributedSampler(train_dataset, shuffle=shuffle) train_loader = DataLoader( train_dataset, @@ -73,14 +72,8 @@ def main(): generator=torch_seed, worker_init_fn=seed_worker, ) - else: - train_loader = DataLoader( - train_dataset, - batch_size=args.batch_size, - generator=torch_seed, - worker_init_fn=seed_worker, - ) + device = torch.device(f"cuda:{local_rank}" if use_cuda else "cpu", local_rank) model = torchvision.models.resnet152().to(device) # Distributing the model to the workers @@ -127,4 +120,3 @@ def main(): if __name__ == "__main__": main() - sys.exit() diff --git a/tutorials/distributed-ml/torch-scaling-test/deepspeed_trainer.py b/tutorials/distributed-ml/torch-scaling-test/deepspeed_trainer.py index c34ffd8d..1fe6d017 100644 --- a/tutorials/distributed-ml/torch-scaling-test/deepspeed_trainer.py +++ b/tutorials/distributed-ml/torch-scaling-test/deepspeed_trainer.py @@ -9,12 +9,10 @@ """Scaling test of Microsoft Deepspeed on Imagenet using Resnet.""" -import argparse import os import sys import time from timeit import default_timer as timer -from typing import Optional import deepspeed import torch @@ -22,120 +20,17 @@ import torchvision from torch.utils.data import DataLoader from torch.utils.data.distributed import DistributedSampler -from utils import imagenet_dataset, parse_params, train_epoch +from utils import imagenet_dataset, get_parser, train_epoch from itwinai.loggers import EpochTimeTracker -from itwinai.parser import ArgumentParser as ItwinaiArgParser from itwinai.torch.reproducibility import seed_worker, set_seed -def parse_params(): - parser = ItwinaiArgParser(description="PyTorch Imagenet scaling test") - - # Data and logging - parser.add_argument( - "--data-dir", - default="./", - help=("location of the training dataset in the " "local filesystem"), - ) - parser.add_argument( - "--log-int", - type=int, - default=10, - help="log interval per training. Disabled if < 0.", - ) - parser.add_argument( - "--verbose", - action=argparse.BooleanOptionalAction, - help="Print parsed arguments", - ) - parser.add_argument( - "--nworker", - type=int, - default=0, - help=("number of workers in DataLoader " "(default: 0 - only main)"), - ) - parser.add_argument( - "--prefetch", - type=int, - default=2, - help="prefetch data in DataLoader (default: 2)", - ) - - # Model - parser.add_argument( - "--batch-size", - type=int, - default=64, - metavar="N", - help="input batch size for training (default: 64)", - ) - parser.add_argument( - "--epochs", - type=int, - default=10, - metavar="N", - help="number of epochs to train (default: 10)", - ) - parser.add_argument( - "--lr", - type=float, - default=0.01, - metavar="LR", - help="learning rate (default: 0.01)", - ) - parser.add_argument( - "--momentum", - type=float, - default=0.5, - help="momentum in SGD optimizer (default: 0.5)", - ) - parser.add_argument( - "--shuff", - action="store_true", - default=False, - help="shuffle dataset (default: False)", - ) - - # Reproducibility - parser.add_argument( - "--rnd-seed", - type=Optional[int], - default=None, - help="seed integer for reproducibility (default: 0)", - ) - - # Distributed ML - parser.add_argument( - "--backend", - type=str, - default="nccl", - metavar="N", - help="backend for parallelization (default: nccl)", - ) - parser.add_argument( - "--no-cuda", action="store_true", default=False, help="disables GPGPUs" - ) - parser.add_argument( - "--local_rank", - type=int, - default=-1, - help="local rank passed from distributed launcher", - ) - - # parse to deepspeed - parser = deepspeed.add_config_arguments(parser) - args = parser.parse_args() - if args.verbose: - args_list = [f"{key}: {val}" for key, val in args.items()] - print("PARSED ARGS:\n", "\n".join(args_list)) - - return args - - def main(): # Parse CLI args - args = parse_params() + parser = get_parser() + parser = deepspeed.add_config_arguments(parser) + args = parser.parse_args() # Check resources availability use_cuda = not args.no_cuda and torch.cuda.is_available() @@ -149,44 +44,22 @@ def main(): # Start the timer for profiling st = timer() - # Initializes the distributed backend - if is_distributed: - deepspeed.init_distributed(dist_backend=args.backend) - # Set random seed for reproducibility torch_prng = set_seed(args.rnd_seed, deterministic_cudnn=False) if is_distributed: + + deepspeed.init_distributed(dist_backend=args.backend) # Get job rank info - rank==0 master gpu - gwsize = dist.get_world_size() # global world size - per run lwsize = torch.cuda.device_count() # local world size - per node grank = dist.get_rank() # global rank - assign per run lrank = dist.get_rank() % lwsize # local rank - assign per node else: # Use a single worker (either on GPU or CPU) lwsize = 1 - gwsize = 1 grank = 0 lrank = 0 - if grank == 0: - print("TIMER: initialise:", timer() - st, "s") - print("DEBUG: local ranks:", lwsize, "/ global ranks:", gwsize) - print("DEBUG: sys.version:", sys.version) - print("DEBUG: args.data_dir:", args.data_dir) - print("DEBUG: args.log_int:", args.log_int) - print("DEBUG: args.nworker:", args.nworker) - print("DEBUG: args.prefetch:", args.prefetch) - print("DEBUG: args.batch_size:", args.batch_size) - print("DEBUG: args.epochs:", args.epochs) - print("DEBUG: args.lr:", args.lr) - print("DEBUG: args.momentum:", args.momentum) - print("DEBUG: args.shuff:", args.shuff) - print("DEBUG: args.rnd_seed:", args.rnd_seed) - print("DEBUG: args.backend:", args.backend) - print("DEBUG: args.local_rank:", args.local_rank) - print("DEBUG: args.no_cuda:", args.no_cuda, "\n") - # Encapsulate the model on the GPU assigned to the current process if use_cuda: torch.cuda.set_device(lrank) diff --git a/tutorials/distributed-ml/torch-scaling-test/horovod_trainer.py b/tutorials/distributed-ml/torch-scaling-test/horovod_trainer.py index 479da6ca..fda73a9e 100755 --- a/tutorials/distributed-ml/torch-scaling-test/horovod_trainer.py +++ b/tutorials/distributed-ml/torch-scaling-test/horovod_trainer.py @@ -11,8 +11,6 @@ """Scaling test of Horovod on Imagenet using Resnet.""" import os -import sys -import time from timeit import default_timer as timer import horovod.torch as hvd @@ -21,7 +19,7 @@ import torchvision from torch.utils.data import DataLoader from torch.utils.data.distributed import DistributedSampler -from utils import imagenet_dataset, parse_params, train_epoch +from utils import imagenet_dataset, get_parser, train_epoch from itwinai.loggers import EpochTimeTracker from itwinai.torch.reproducibility import seed_worker, set_seed @@ -29,7 +27,8 @@ def main(): # Parse CLI args - args = parse_params() + parser = get_parser() + args = parser.parse_args() # Check resources availability use_cuda = not args.no_cuda and torch.cuda.is_available() @@ -37,6 +36,7 @@ def main(): torch_seed = set_seed(args.rnd_seed, deterministic_cudnn=False) train_dataset = imagenet_dataset(args.data_dir) + # Setting variables if not is_distributed: # Use a single worker (either on GPU or CPU) local_rank = 0 @@ -75,7 +75,7 @@ def main(): train_dataset, num_replicas=global_world_size, rank=global_rank, - shuffle=shuffle + shuffle=shuffle, ) train_loader = DataLoader( train_dataset, @@ -117,7 +117,7 @@ def main(): ) if global_rank == 0: - num_nodes = os.environ.get("SLURM_NNODES", "unk") + num_nodes = os.environ.get("SLURM_NNODES", "1") epoch_time_tracker = EpochTimeTracker( strategy_name="horovod-bl", save_path=f"epochtime_horovod-bl_{num_nodes}N.csv", @@ -127,6 +127,7 @@ def main(): start_time = timer() for epoch_idx in range(args.epochs): epoch_start_time = timer() + if is_distributed: train_sampler.set_epoch(epoch_idx) @@ -147,10 +148,6 @@ def main(): total_time = timer() - start_time print(f"Training finished - took {total_time:.2f}s") - time.sleep(1) - print(f" - TRAINING FINISHED") - if __name__ == "__main__": main() - sys.exit() diff --git a/tutorials/distributed-ml/torch-scaling-test/utils.py b/tutorials/distributed-ml/torch-scaling-test/utils.py index 2dfb6e28..bf45fc4f 100644 --- a/tutorials/distributed-ml/torch-scaling-test/utils.py +++ b/tutorials/distributed-ml/torch-scaling-test/utils.py @@ -69,7 +69,7 @@ def train_epoch( return total_loss -def parse_params(): +def get_parser() -> ItwinaiArgParser: parser = ItwinaiArgParser(description="PyTorch Imagenet scaling test") parser.add_argument( @@ -83,11 +83,6 @@ def parse_params(): default=10, help="log interval per training. Disabled if < 0.", ) - parser.add_argument( - "--verbose", - action=argparse.BooleanOptionalAction, - help="Print parsed arguments", - ) parser.add_argument( "--nworker", type=int, @@ -139,10 +134,4 @@ def parse_params(): parser.add_argument( "--no-cuda", action="store_true", default=False, help="disables GPGPUs" ) - - args = parser.parse_args() - - if args.verbose: - args_list = [f"{key}: {val}" for key, val in args.items()] - print("PARSED ARGS:\n", "\n".join(args_list)) - return args + return parser