Skip to content

Commit

Permalink
allow user to specify imagenet subset size
Browse files Browse the repository at this point in the history
  • Loading branch information
jarlsondre committed Jan 7, 2025
1 parent 3604aa5 commit 003a683
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 182 deletions.
5 changes: 5 additions & 0 deletions tutorials/distributed-ml/torch-scaling-test/config/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@

# Data and logging
data_dir: /p/scratch/intertwin/datasets/imagenet/ILSVRC2012/train/ # tmp_data/

# Subset size can be an int or None. Cannot be larger than the length of the dataset.
# If you wish to set it to "None", you must use "null" as that is what yaml expects
subset_size: null
log_int: 10

# verbose: True
nworker: 4 # num workers dataloader
prefetch: 2
Expand Down
3 changes: 1 addition & 2 deletions tutorials/distributed-ml/torch-scaling-test/ddp_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@ def main():
parser = get_parser()
args = parser.parse_args()

subset_size = 5000 # limit number of examples from imagenet
use_cuda = not args.no_cuda and torch.cuda.is_available()
is_distributed = use_cuda and torch.cuda.device_count() > 0
torch_seed = set_seed(args.rnd_seed, deterministic_cudnn=False)
shuffle: bool = args.shuff and args.rnd_seed is None
persistent_workers = args.nworker > 1

train_dataset = imagenet_dataset(args.data_dir, subset_size=subset_size)
train_dataset = imagenet_dataset(args.data_dir, subset_size=args.subset_size)
train_sampler = None

if is_distributed:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ def main():
args = parser.parse_args()

# Check resources availability
subset_size = 5000 # limit number of examples from imagenet
use_cuda = not args.no_cuda and torch.cuda.is_available()
is_distributed = use_cuda and torch.cuda.device_count() > 0
torch_prng = set_seed(args.rnd_seed, deterministic_cudnn=False)

shuffle = args.shuff and args.rnd_seed is None
persistent_workers = args.nworker > 1

train_dataset = imagenet_dataset(args.data_dir, subset_size=subset_size)
train_dataset = imagenet_dataset(args.data_dir, subset_size=args.subset_size)
train_sampler = None

if is_distributed:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ def main():
args = parser.parse_args()

# Check resources availability
subset_size = 5000 # limit number of examples from imagenet
use_cuda = not args.no_cuda and torch.cuda.is_available()
is_distributed = use_cuda and torch.cuda.device_count() > 0
torch_seed = set_seed(args.rnd_seed, deterministic_cudnn=False)

shuffle = args.shuff and args.rnd_seed is None
persistent_workers = args.nworker > 1

train_dataset = imagenet_dataset(args.data_dir, subset_size=subset_size)
train_dataset = imagenet_dataset(args.data_dir, subset_size=args.subset_size)
train_sampler = None

# Setting variables
Expand Down
187 changes: 11 additions & 176 deletions tutorials/distributed-ml/torch-scaling-test/itwinai_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,197 +12,36 @@
to use checkpoints.
"""

import argparse
# import argparse
import os
import sys
from timeit import default_timer as timer
from typing import Optional
# from typing import Optional

import deepspeed
# import deepspeed
import horovod.torch as hvd
import torch
import torch.nn.functional as F
# import torch.nn.functional as F
import torchvision
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from utils import imagenet_dataset
from utils import imagenet_dataset, get_parser, train_epoch

from itwinai.loggers import EpochTimeTracker
from itwinai.parser import ArgumentParser as ItwinaiArgParser
# from itwinai.parser import ArgumentParser as ItwinaiArgParser
from itwinai.torch.distributed import (
DeepSpeedStrategy,
HorovodStrategy,
TorchDDPStrategy,
TorchDistributedStrategy,
# TorchDistributedStrategy,
)
from itwinai.torch.reproducibility import seed_worker, set_seed


def parse_params() -> argparse.Namespace:
"""
Parse CLI args, which can also be loaded from a configuration file
using the --config flag:
>>> train.py --strategy ddp --config base-config.yaml --config foo.yaml
"""
parser = ItwinaiArgParser(description="PyTorch Imagenet Example")

# Distributed ML strategy
parser.add_argument(
"--strategy",
"-s",
type=str,
choices=["ddp", "horovod", "deepspeed"],
default="ddp",
)

# Data and logging
parser.add_argument(
"--data-dir",
default="./",
help=("location of the training dataset in the local " "filesystem"),
)
parser.add_argument(
"--log-int", type=int, default=10, help="log interval per training"
)
parser.add_argument(
"--verbose",
action=argparse.BooleanOptionalAction,
help="Print parsed arguments",
)
parser.add_argument(
"--nworker",
type=int,
default=0,
help=("number of workers in DataLoader (default: 0 -" " only main)"),
)
parser.add_argument(
"--prefetch",
type=int,
default=2,
help="prefetch data in DataLoader (default: 2)",
)

# Model
parser.add_argument(
"--batch-size",
type=int,
default=64,
help="input batch size for training (default: 64)",
)
parser.add_argument(
"--epochs", type=int, default=10, help="number of epochs to train (default: 10)"
)
parser.add_argument(
"--lr", type=float, default=0.01, help="learning rate (default: 0.01)"
)
parser.add_argument(
"--momentum",
type=float,
default=0.5,
help="momentum in SGD optimizer (default: 0.5)",
)
parser.add_argument(
"--shuff",
action="store_true",
default=False,
help="shuffle dataset (default: False)",
)

# Reproducibility
parser.add_argument(
"--rnd-seed",
type=Optional[int],
default=None,
help="seed integer for reproducibility (default: 0)",
)

# Distributed ML
parser.add_argument(
"--backend",
type=str,
default="nccl",
help="backend for parrallelisation (default: nccl)",
)
parser.add_argument(
"--no-cuda", action="store_true", default=False, help="disables GPGPUs"
)
parser.add_argument(
"--local_rank",
type=int,
default=-1,
help="local rank passed from distributed launcher",
)

# Horovod
parser.add_argument(
"--fp16-allreduce",
action="store_true",
default=False,
help="use fp16 compression during allreduce",
)
parser.add_argument(
"--use-adasum",
action="store_true",
default=False,
help="use adasum algorithm to do reduction",
)
parser.add_argument(
"--gradient-predivide-factor",
type=float,
default=1.0,
help=("apply gradient pre-divide factor in optimizer " "(default: 1.0)"),
)

# DeepSpeed
parser = deepspeed.add_config_arguments(parser)
args = parser.parse_args()

if args.verbose:
args_list = [f"{key}: {val}" for key, val in args.items()]
print("PARSED ARGS:\n", "\n".join(args_list))

return args


def train(
model,
device,
train_loader,
optimizer,
epoch,
strategy: TorchDistributedStrategy,
args,
):
"""Training function, representing an epoch."""
model.train()
loss_acc = 0
gwsize = strategy.global_world_size()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if (
strategy.is_main_worker
and args.log_int > 0
and batch_idx % args.log_int == 0
):
print(
f"Train epoch: {epoch} "
f"[{batch_idx * len(data)}/{len(train_loader.dataset)/gwsize} "
f"({100.0 * batch_idx / len(train_loader):.0f}%)]\t\t"
f"Loss: {loss.item():.6f}"
)
loss_acc += loss.item()
return loss_acc


def main():
# Parse CLI args
args = parse_params()
parser = get_parser()
args = parser.parse_args()

# Instantiate Strategy
if args.strategy == "ddp":
Expand Down Expand Up @@ -236,8 +75,7 @@ def main():
is_distributed = use_cuda and torch.cuda.device_count() > 0

# Dataset
subset_size = 5000
train_dataset = imagenet_dataset(args.data_dir, subset_size=subset_size)
train_dataset = imagenet_dataset(args.data_dir, subset_size=args.subset_size)

# Set random seed for reproducibility
torch_prng = set_seed(args.rnd_seed, deterministic_cudnn=False)
Expand Down Expand Up @@ -312,14 +150,11 @@ def main():
train_sampler.set_epoch(epoch_idx)

# Training
train(
train_epoch(
model=model,
device=device,
train_loader=train_loader,
optimizer=optimizer,
epoch=epoch_idx,
strategy=strategy,
args=args,
)

if strategy.is_main_worker:
Expand Down
6 changes: 6 additions & 0 deletions tutorials/distributed-ml/torch-scaling-test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ def get_parser() -> ItwinaiArgParser:
default="nccl",
help="backend for parallelisation (default: nccl)",
)
parser.add_argument(
"--subset-size",
type=int|None,
default=None,
help="How big of a subset of ImageNet to use during training.",
)
parser.add_argument(
"--no-cuda", action="store_true", default=False, help="disables GPGPUs"
)
Expand Down

0 comments on commit 003a683

Please sign in to comment.