Skip to content

Commit

Permalink
update deepspeed trainer
Browse files Browse the repository at this point in the history
  • Loading branch information
jarlsondre committed Dec 17, 2024
1 parent a140dd0 commit 9f7b1ab
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 102 deletions.
3 changes: 1 addition & 2 deletions tutorials/distributed-ml/torch-scaling-test/ddp_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"""Scaling test of torch Distributed Data Parallel on Imagenet using Resnet."""

import os
import sys
from timeit import default_timer as timer

import torch
Expand All @@ -31,7 +30,7 @@ def main():
parser = get_parser()
args = parser.parse_args()

subset_size = 5000
subset_size = 5000 # limit number of examples from imagenet
use_cuda = not args.no_cuda and torch.cuda.is_available()
is_distributed = use_cuda and torch.cuda.device_count() > 0
torch_seed = set_seed(args.rnd_seed, deterministic_cudnn=False)
Expand Down
147 changes: 47 additions & 100 deletions tutorials/distributed-ml/torch-scaling-test/deepspeed_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
"""Scaling test of Microsoft Deepspeed on Imagenet using Resnet."""

import os
import sys
import time
from timeit import default_timer as timer


import deepspeed
import torch
import torch.distributed as dist
Expand All @@ -23,7 +22,7 @@
from utils import imagenet_dataset, get_parser, train_epoch

from itwinai.loggers import EpochTimeTracker
from itwinai.torch.reproducibility import seed_worker, set_seed
from itwinai.torch.reproducibility import set_seed


def main():
Expand All @@ -33,71 +32,56 @@ def main():
args = parser.parse_args()

# Check resources availability
subset_size = 5000 # limit number of examples from imagenet
use_cuda = not args.no_cuda and torch.cuda.is_available()
is_distributed = False
if use_cuda and torch.cuda.device_count() > 0:
is_distributed = True

# Limit # of CPU threads to be used per worker
# torch.set_num_threads(1)
is_distributed = use_cuda and torch.cuda.device_count() > 0
torch_prng = set_seed(args.rnd_seed, deterministic_cudnn=False)

# Start the timer for profiling
st = timer()

# Set random seed for reproducibility
torch_prng = set_seed(args.rnd_seed, deterministic_cudnn=False)

train_dataset = imagenet_dataset(args.data_dir, subset_size=subset_size)
if is_distributed:

deepspeed.init_distributed(dist_backend=args.backend)
# Get job rank info - rank==0 master gpu
lwsize = torch.cuda.device_count() # local world size - per node
grank = dist.get_rank() # global rank - assign per run
lrank = dist.get_rank() % lwsize # local rank - assign per node
else:
# Use a single worker (either on GPU or CPU)
lwsize = 1
grank = 0
lrank = 0

# Encapsulate the model on the GPU assigned to the current process
if use_cuda:
torch.cuda.set_device(lrank)
local_world_size = torch.cuda.device_count()
global_rank = dist.get_rank()
local_rank = dist.get_rank() % local_world_size

# Read training dataset
train_dataset = imagenet_dataset(args.data_dir)
shuffle = args.shuff and args.rnd_seed is None
# pin_memory=True
# persistent_workers = args.nworker > 1

if is_distributed:
# Distributed sampler restricts data loading to a subset of the dataset
# exclusive to the current process.
# `mun_replicas` and `rank` are automatically retrieved from
# the current distributed group.
train_sampler = DistributedSampler(
train_dataset, # num_replicas=gwsize, rank=grank,
shuffle=(args.shuff and args.rnd_seed is None),
)

train_loader = DataLoader(
train_dataset,
batch_size=args.batch_size,
sampler=train_sampler,
num_workers=args.nworker,
pin_memory=True,
persistent_workers=(args.nworker > 1),
prefetch_factor=args.prefetch,
generator=torch_prng,
worker_init_fn=seed_worker,
shuffle=shuffle,
)
# train_loader = DataLoader(
# train_dataset,
# batch_size=args.batch_size,
# sampler=train_sampler,
# num_workers=args.nworker,
# pin_memory=pin_memory,
# persistent_workers=persistent_workers,
# prefetch_factor=args.prefetch,
# generator=torch_prng,
# worker_init_fn=seed_worker,
# )
else:
train_loader = DataLoader(
train_dataset,
batch_size=args.batch_size,
generator=torch_prng,
worker_init_fn=seed_worker,
)
# Use a single worker (either on GPU or CPU)
local_world_size = 1
global_rank = 0
local_rank = 0
# train_loader = DataLoader(
# train_dataset,
# batch_size=args.batch_size,
# generator=torch_prng,
# worker_init_fn=seed_worker,
# )

device = torch.device(f"cuda:{local_rank}" if use_cuda else "cpu", local_rank)

# Create CNN model
model = torchvision.models.resnet152()
model = torchvision.models.resnet152().to(device)

# Initialize DeepSpeed and get:
# 1) Distributed model
Expand All @@ -121,71 +105,35 @@ def main():
)

# Start training loop
if grank == 0:
print("TIMER: broadcast:", timer() - st, "s")
print("\nDEBUG: start training")
print("--------------------------------------------------------")
nnod = os.environ.get("SLURM_NNODES", "unk")
if global_rank == 0:
num_nodes = os.environ.get("SLURM_NNODES", "1")
epoch_time_tracker = EpochTimeTracker(
strategy_name="deepspeed-bl",
save_path=f"epochtime_deepspeed-bl_{nnod}N.csv",
num_nodes=int(nnod),
save_path=f"epochtime_deepspeed-bl_{num_nodes}N.csv",
num_nodes=int(num_nodes),
)

et = timer()
start_epoch = 1
for epoch in range(start_epoch, args.epochs + 1):
lt = timer()
epoch_start_time = timer()
if is_distributed:
# Inform the sampler that a new epoch started: shuffle
# may be needed
train_sampler.set_epoch(epoch)

# Training
train_epoch(
model=distrib_model, device=device, train_loader=train_loader, optimizer=optimizer
model=distrib_model, device=device, train_loader=deepspeed_train_loader, optimizer=optimizer
)

# Save first epoch timer
if epoch == start_epoch:
first_ep_t = timer() - lt

# Final epoch
if epoch + 1 == args.epochs:
train_loader.last_epoch = True

if grank == 0:
print("TIMER: epoch time:", timer() - lt, "s")
epoch_time_tracker.add_epoch_time(epoch - 1, timer() - lt)
if global_rank == 0:
epoch_elapsed_time = timer() - epoch_start_time
epoch_time_tracker.add_epoch_time(epoch_idx, epoch_elapsed_time)
print(f"[{epoch_idx+1}/{args.epochs+1}] - time: {epoch_elapsed_time:.2f}s")

if is_distributed:
dist.barrier()

if grank == 0:
print("\n--------------------------------------------------------")
print("DEBUG: results:\n")
print("TIMER: first epoch time:", first_ep_t, " s")
print("TIMER: last epoch time:", timer() - lt, " s")
print("TIMER: average epoch time:", (timer() - et) / args.epochs, " s")
print("TIMER: total epoch time:", timer() - et, " s")
if epoch > 1:
print("TIMER: total epoch-1 time:", timer() - et - first_ep_t, " s")
print(
"TIMER: average epoch-1 time:",
(timer() - et - first_ep_t) / (args.epochs - 1),
" s",
)
if use_cuda:
print(
"DEBUG: memory req:",
int(torch.cuda.memory_reserved(lrank) / 1024 / 1024),
"MB",
)
print("DEBUG: memory summary:\n\n", torch.cuda.memory_summary(0))
print(f"TIMER: final time: {timer()-st} s\n")

time.sleep(1)
print(f"<Global rank: {grank}> - TRAINING FINISHED")

# Clean-up
if is_distributed:
Expand All @@ -194,4 +142,3 @@ def main():

if __name__ == "__main__":
main()
sys.exit()

0 comments on commit 9f7b1ab

Please sign in to comment.