Skip to content

Commit

Permalink
more clean up [WIP]
Browse files Browse the repository at this point in the history
  • Loading branch information
jarlsondre committed Dec 16, 2024
1 parent 65a6334 commit a140dd0
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 184 deletions.
48 changes: 20 additions & 28 deletions tutorials/distributed-ml/torch-scaling-test/ddp_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,45 @@
import torchvision
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from utils import imagenet_dataset, parse_params, train_epoch
from utils import imagenet_dataset, get_parser, train_epoch

from itwinai.loggers import EpochTimeTracker
from itwinai.torch.reproducibility import seed_worker, set_seed


def main():
args = parse_params()
parser = get_parser()
args = parser.parse_args()

subset_size = 5000
use_cuda = not args.no_cuda and torch.cuda.is_available()
is_distributed = use_cuda and torch.cuda.device_count() > 0
torch_seed = set_seed(args.rnd_seed, deterministic_cudnn=False)

if is_distributed:
train_dataset = imagenet_dataset(args.data_dir, subset_size=subset_size)
if not is_distributed:
local_world_size = 1
global_rank = 0
local_rank = 0

train_loader = DataLoader(
train_dataset,
batch_size=args.batch_size,
generator=torch_seed,
worker_init_fn=seed_worker,
)
else:
# Initializing the distribution backend
dist.init_process_group(backend=args.backend)

local_world_size = torch.cuda.device_count()
global_rank = dist.get_rank()
local_rank = dist.get_rank() % local_world_size
else:
local_world_size = 1
global_rank = 0
local_rank = 0

# Set random seed for reproducibility
torch_seed = set_seed(args.rnd_seed, deterministic_cudnn=False)
device = torch.device("cuda" if use_cuda else "cpu", local_rank)
if use_cuda:
torch.cuda.set_device(local_rank)

# Dataset
train_dataset = imagenet_dataset(args.data_dir, subset_size=subset_size)

if is_distributed:
# Distributed training requires a distributed sampler to split the data
# between the workers
# Creating dataset and dataloader
shuffle: bool = args.shuff and args.rnd_seed is None
pin_memory = True
persistent_workers = args.nworker > 1

train_sampler = DistributedSampler(train_dataset, shuffle=shuffle)
train_loader = DataLoader(
train_dataset,
Expand All @@ -73,14 +72,8 @@ def main():
generator=torch_seed,
worker_init_fn=seed_worker,
)
else:
train_loader = DataLoader(
train_dataset,
batch_size=args.batch_size,
generator=torch_seed,
worker_init_fn=seed_worker,
)

device = torch.device(f"cuda:{local_rank}" if use_cuda else "cpu", local_rank)
model = torchvision.models.resnet152().to(device)

# Distributing the model to the workers
Expand Down Expand Up @@ -127,4 +120,3 @@ def main():

if __name__ == "__main__":
main()
sys.exit()
139 changes: 6 additions & 133 deletions tutorials/distributed-ml/torch-scaling-test/deepspeed_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,133 +9,28 @@

"""Scaling test of Microsoft Deepspeed on Imagenet using Resnet."""

import argparse
import os
import sys
import time
from timeit import default_timer as timer
from typing import Optional

import deepspeed
import torch
import torch.distributed as dist
import torchvision
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from utils import imagenet_dataset, parse_params, train_epoch
from utils import imagenet_dataset, get_parser, train_epoch

from itwinai.loggers import EpochTimeTracker
from itwinai.parser import ArgumentParser as ItwinaiArgParser
from itwinai.torch.reproducibility import seed_worker, set_seed


def parse_params():
parser = ItwinaiArgParser(description="PyTorch Imagenet scaling test")

# Data and logging
parser.add_argument(
"--data-dir",
default="./",
help=("location of the training dataset in the " "local filesystem"),
)
parser.add_argument(
"--log-int",
type=int,
default=10,
help="log interval per training. Disabled if < 0.",
)
parser.add_argument(
"--verbose",
action=argparse.BooleanOptionalAction,
help="Print parsed arguments",
)
parser.add_argument(
"--nworker",
type=int,
default=0,
help=("number of workers in DataLoader " "(default: 0 - only main)"),
)
parser.add_argument(
"--prefetch",
type=int,
default=2,
help="prefetch data in DataLoader (default: 2)",
)

# Model
parser.add_argument(
"--batch-size",
type=int,
default=64,
metavar="N",
help="input batch size for training (default: 64)",
)
parser.add_argument(
"--epochs",
type=int,
default=10,
metavar="N",
help="number of epochs to train (default: 10)",
)
parser.add_argument(
"--lr",
type=float,
default=0.01,
metavar="LR",
help="learning rate (default: 0.01)",
)
parser.add_argument(
"--momentum",
type=float,
default=0.5,
help="momentum in SGD optimizer (default: 0.5)",
)
parser.add_argument(
"--shuff",
action="store_true",
default=False,
help="shuffle dataset (default: False)",
)

# Reproducibility
parser.add_argument(
"--rnd-seed",
type=Optional[int],
default=None,
help="seed integer for reproducibility (default: 0)",
)

# Distributed ML
parser.add_argument(
"--backend",
type=str,
default="nccl",
metavar="N",
help="backend for parallelization (default: nccl)",
)
parser.add_argument(
"--no-cuda", action="store_true", default=False, help="disables GPGPUs"
)
parser.add_argument(
"--local_rank",
type=int,
default=-1,
help="local rank passed from distributed launcher",
)

# parse to deepspeed
parser = deepspeed.add_config_arguments(parser)
args = parser.parse_args()
if args.verbose:
args_list = [f"{key}: {val}" for key, val in args.items()]
print("PARSED ARGS:\n", "\n".join(args_list))

return args


def main():
# Parse CLI args
args = parse_params()
parser = get_parser()
parser = deepspeed.add_config_arguments(parser)
args = parser.parse_args()

# Check resources availability
use_cuda = not args.no_cuda and torch.cuda.is_available()
Expand All @@ -149,44 +44,22 @@ def main():
# Start the timer for profiling
st = timer()

# Initializes the distributed backend
if is_distributed:
deepspeed.init_distributed(dist_backend=args.backend)

# Set random seed for reproducibility
torch_prng = set_seed(args.rnd_seed, deterministic_cudnn=False)

if is_distributed:

deepspeed.init_distributed(dist_backend=args.backend)
# Get job rank info - rank==0 master gpu
gwsize = dist.get_world_size() # global world size - per run
lwsize = torch.cuda.device_count() # local world size - per node
grank = dist.get_rank() # global rank - assign per run
lrank = dist.get_rank() % lwsize # local rank - assign per node
else:
# Use a single worker (either on GPU or CPU)
lwsize = 1
gwsize = 1
grank = 0
lrank = 0

if grank == 0:
print("TIMER: initialise:", timer() - st, "s")
print("DEBUG: local ranks:", lwsize, "/ global ranks:", gwsize)
print("DEBUG: sys.version:", sys.version)
print("DEBUG: args.data_dir:", args.data_dir)
print("DEBUG: args.log_int:", args.log_int)
print("DEBUG: args.nworker:", args.nworker)
print("DEBUG: args.prefetch:", args.prefetch)
print("DEBUG: args.batch_size:", args.batch_size)
print("DEBUG: args.epochs:", args.epochs)
print("DEBUG: args.lr:", args.lr)
print("DEBUG: args.momentum:", args.momentum)
print("DEBUG: args.shuff:", args.shuff)
print("DEBUG: args.rnd_seed:", args.rnd_seed)
print("DEBUG: args.backend:", args.backend)
print("DEBUG: args.local_rank:", args.local_rank)
print("DEBUG: args.no_cuda:", args.no_cuda, "\n")

# Encapsulate the model on the GPU assigned to the current process
if use_cuda:
torch.cuda.set_device(lrank)
Expand Down
17 changes: 7 additions & 10 deletions tutorials/distributed-ml/torch-scaling-test/horovod_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
"""Scaling test of Horovod on Imagenet using Resnet."""

import os
import sys
import time
from timeit import default_timer as timer

import horovod.torch as hvd
Expand All @@ -21,22 +19,24 @@
import torchvision
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from utils import imagenet_dataset, parse_params, train_epoch
from utils import imagenet_dataset, get_parser, train_epoch

from itwinai.loggers import EpochTimeTracker
from itwinai.torch.reproducibility import seed_worker, set_seed


def main():
# Parse CLI args
args = parse_params()
parser = get_parser()
args = parser.parse_args()

# Check resources availability
use_cuda = not args.no_cuda and torch.cuda.is_available()
is_distributed = use_cuda and torch.cuda.device_count() > 0
torch_seed = set_seed(args.rnd_seed, deterministic_cudnn=False)
train_dataset = imagenet_dataset(args.data_dir)

# Setting variables
if not is_distributed:
# Use a single worker (either on GPU or CPU)
local_rank = 0
Expand Down Expand Up @@ -75,7 +75,7 @@ def main():
train_dataset,
num_replicas=global_world_size,
rank=global_rank,
shuffle=shuffle
shuffle=shuffle,
)
train_loader = DataLoader(
train_dataset,
Expand Down Expand Up @@ -117,7 +117,7 @@ def main():
)

if global_rank == 0:
num_nodes = os.environ.get("SLURM_NNODES", "unk")
num_nodes = os.environ.get("SLURM_NNODES", "1")
epoch_time_tracker = EpochTimeTracker(
strategy_name="horovod-bl",
save_path=f"epochtime_horovod-bl_{num_nodes}N.csv",
Expand All @@ -127,6 +127,7 @@ def main():
start_time = timer()
for epoch_idx in range(args.epochs):
epoch_start_time = timer()

if is_distributed:
train_sampler.set_epoch(epoch_idx)

Expand All @@ -147,10 +148,6 @@ def main():
total_time = timer() - start_time
print(f"Training finished - took {total_time:.2f}s")

time.sleep(1)
print(f"<Hvd rank: {hvd.rank()}> - TRAINING FINISHED")


if __name__ == "__main__":
main()
sys.exit()
15 changes: 2 additions & 13 deletions tutorials/distributed-ml/torch-scaling-test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def train_epoch(
return total_loss


def parse_params():
def get_parser() -> ItwinaiArgParser:
parser = ItwinaiArgParser(description="PyTorch Imagenet scaling test")

parser.add_argument(
Expand All @@ -83,11 +83,6 @@ def parse_params():
default=10,
help="log interval per training. Disabled if < 0.",
)
parser.add_argument(
"--verbose",
action=argparse.BooleanOptionalAction,
help="Print parsed arguments",
)
parser.add_argument(
"--nworker",
type=int,
Expand Down Expand Up @@ -139,10 +134,4 @@ def parse_params():
parser.add_argument(
"--no-cuda", action="store_true", default=False, help="disables GPGPUs"
)

args = parser.parse_args()

if args.verbose:
args_list = [f"{key}: {val}" for key, val in args.items()]
print("PARSED ARGS:\n", "\n".join(args_list))
return args
return parser

0 comments on commit a140dd0

Please sign in to comment.