diff --git a/tutorials/distributed-ml/torch-scaling-test/ddp_trainer.py b/tutorials/distributed-ml/torch-scaling-test/ddp_trainer.py index 6f1ed73a..5297bda1 100755 --- a/tutorials/distributed-ml/torch-scaling-test/ddp_trainer.py +++ b/tutorials/distributed-ml/torch-scaling-test/ddp_trainer.py @@ -12,7 +12,6 @@ """Scaling test of torch Distributed Data Parallel on Imagenet using Resnet.""" import os -import sys from timeit import default_timer as timer import torch @@ -31,7 +30,7 @@ def main(): parser = get_parser() args = parser.parse_args() - subset_size = 5000 + subset_size = 5000 # limit number of examples from imagenet use_cuda = not args.no_cuda and torch.cuda.is_available() is_distributed = use_cuda and torch.cuda.device_count() > 0 torch_seed = set_seed(args.rnd_seed, deterministic_cudnn=False) diff --git a/tutorials/distributed-ml/torch-scaling-test/deepspeed_trainer.py b/tutorials/distributed-ml/torch-scaling-test/deepspeed_trainer.py index 1fe6d017..49bf8794 100644 --- a/tutorials/distributed-ml/torch-scaling-test/deepspeed_trainer.py +++ b/tutorials/distributed-ml/torch-scaling-test/deepspeed_trainer.py @@ -10,10 +10,9 @@ """Scaling test of Microsoft Deepspeed on Imagenet using Resnet.""" import os -import sys -import time from timeit import default_timer as timer + import deepspeed import torch import torch.distributed as dist @@ -23,7 +22,7 @@ from utils import imagenet_dataset, get_parser, train_epoch from itwinai.loggers import EpochTimeTracker -from itwinai.torch.reproducibility import seed_worker, set_seed +from itwinai.torch.reproducibility import set_seed def main(): @@ -33,71 +32,56 @@ def main(): args = parser.parse_args() # Check resources availability + subset_size = 5000 # limit number of examples from imagenet use_cuda = not args.no_cuda and torch.cuda.is_available() - is_distributed = False - if use_cuda and torch.cuda.device_count() > 0: - is_distributed = True - - # Limit # of CPU threads to be used per worker - # torch.set_num_threads(1) + is_distributed = use_cuda and torch.cuda.device_count() > 0 + torch_prng = set_seed(args.rnd_seed, deterministic_cudnn=False) - # Start the timer for profiling st = timer() - # Set random seed for reproducibility - torch_prng = set_seed(args.rnd_seed, deterministic_cudnn=False) - + train_dataset = imagenet_dataset(args.data_dir, subset_size=subset_size) if is_distributed: - deepspeed.init_distributed(dist_backend=args.backend) - # Get job rank info - rank==0 master gpu - lwsize = torch.cuda.device_count() # local world size - per node - grank = dist.get_rank() # global rank - assign per run - lrank = dist.get_rank() % lwsize # local rank - assign per node - else: - # Use a single worker (either on GPU or CPU) - lwsize = 1 - grank = 0 - lrank = 0 - # Encapsulate the model on the GPU assigned to the current process - if use_cuda: - torch.cuda.set_device(lrank) + local_world_size = torch.cuda.device_count() + global_rank = dist.get_rank() + local_rank = dist.get_rank() % local_world_size - # Read training dataset - train_dataset = imagenet_dataset(args.data_dir) + shuffle = args.shuff and args.rnd_seed is None + # pin_memory=True + # persistent_workers = args.nworker > 1 - if is_distributed: - # Distributed sampler restricts data loading to a subset of the dataset - # exclusive to the current process. - # `mun_replicas` and `rank` are automatically retrieved from - # the current distributed group. train_sampler = DistributedSampler( - train_dataset, # num_replicas=gwsize, rank=grank, - shuffle=(args.shuff and args.rnd_seed is None), - ) - - train_loader = DataLoader( train_dataset, - batch_size=args.batch_size, - sampler=train_sampler, - num_workers=args.nworker, - pin_memory=True, - persistent_workers=(args.nworker > 1), - prefetch_factor=args.prefetch, - generator=torch_prng, - worker_init_fn=seed_worker, + shuffle=shuffle, ) + # train_loader = DataLoader( + # train_dataset, + # batch_size=args.batch_size, + # sampler=train_sampler, + # num_workers=args.nworker, + # pin_memory=pin_memory, + # persistent_workers=persistent_workers, + # prefetch_factor=args.prefetch, + # generator=torch_prng, + # worker_init_fn=seed_worker, + # ) else: - train_loader = DataLoader( - train_dataset, - batch_size=args.batch_size, - generator=torch_prng, - worker_init_fn=seed_worker, - ) + # Use a single worker (either on GPU or CPU) + local_world_size = 1 + global_rank = 0 + local_rank = 0 + # train_loader = DataLoader( + # train_dataset, + # batch_size=args.batch_size, + # generator=torch_prng, + # worker_init_fn=seed_worker, + # ) + + device = torch.device(f"cuda:{local_rank}" if use_cuda else "cpu", local_rank) # Create CNN model - model = torchvision.models.resnet152() + model = torchvision.models.resnet152().to(device) # Initialize DeepSpeed and get: # 1) Distributed model @@ -121,21 +105,17 @@ def main(): ) # Start training loop - if grank == 0: - print("TIMER: broadcast:", timer() - st, "s") - print("\nDEBUG: start training") - print("--------------------------------------------------------") - nnod = os.environ.get("SLURM_NNODES", "unk") + if global_rank == 0: + num_nodes = os.environ.get("SLURM_NNODES", "1") epoch_time_tracker = EpochTimeTracker( strategy_name="deepspeed-bl", - save_path=f"epochtime_deepspeed-bl_{nnod}N.csv", - num_nodes=int(nnod), + save_path=f"epochtime_deepspeed-bl_{num_nodes}N.csv", + num_nodes=int(num_nodes), ) - et = timer() start_epoch = 1 for epoch in range(start_epoch, args.epochs + 1): - lt = timer() + epoch_start_time = timer() if is_distributed: # Inform the sampler that a new epoch started: shuffle # may be needed @@ -143,49 +123,17 @@ def main(): # Training train_epoch( - model=distrib_model, device=device, train_loader=train_loader, optimizer=optimizer + model=distrib_model, device=device, train_loader=deepspeed_train_loader, optimizer=optimizer ) - # Save first epoch timer - if epoch == start_epoch: - first_ep_t = timer() - lt - - # Final epoch - if epoch + 1 == args.epochs: - train_loader.last_epoch = True - - if grank == 0: - print("TIMER: epoch time:", timer() - lt, "s") - epoch_time_tracker.add_epoch_time(epoch - 1, timer() - lt) + if global_rank == 0: + epoch_elapsed_time = timer() - epoch_start_time + epoch_time_tracker.add_epoch_time(epoch_idx, epoch_elapsed_time) + print(f"[{epoch_idx+1}/{args.epochs+1}] - time: {epoch_elapsed_time:.2f}s") if is_distributed: dist.barrier() - if grank == 0: - print("\n--------------------------------------------------------") - print("DEBUG: results:\n") - print("TIMER: first epoch time:", first_ep_t, " s") - print("TIMER: last epoch time:", timer() - lt, " s") - print("TIMER: average epoch time:", (timer() - et) / args.epochs, " s") - print("TIMER: total epoch time:", timer() - et, " s") - if epoch > 1: - print("TIMER: total epoch-1 time:", timer() - et - first_ep_t, " s") - print( - "TIMER: average epoch-1 time:", - (timer() - et - first_ep_t) / (args.epochs - 1), - " s", - ) - if use_cuda: - print( - "DEBUG: memory req:", - int(torch.cuda.memory_reserved(lrank) / 1024 / 1024), - "MB", - ) - print("DEBUG: memory summary:\n\n", torch.cuda.memory_summary(0)) - print(f"TIMER: final time: {timer()-st} s\n") - - time.sleep(1) - print(f" - TRAINING FINISHED") # Clean-up if is_distributed: @@ -194,4 +142,3 @@ def main(): if __name__ == "__main__": main() - sys.exit()