From a69ea8bd7e7cf19dd9a810970797ee178277c283 Mon Sep 17 00:00:00 2001 From: r-sarma Date: Thu, 2 May 2024 18:41:52 +0200 Subject: [PATCH] TF updates --- .vscode/settings.json | 5 +- .../tf-scaling-test-jube/README.md | 44 +++++ .../tf-scaling-test-jube/bench_plot.ipynb | 170 +++++++++++++++++ .../tf-scaling-test-jube/general_jobsys.xml | 140 ++++++++++++++ .../tf-scaling-test-jube/jube_ddp.sh | 72 ++++++++ .../tf-scaling-test-jube/train.py | 171 ++++++++++++++++++ .../tfmirrored_slurm.sh | 70 +++++++ .../tf-tutorial-1-imagenet/train.py | 171 ++++++++++++++++++ 8 files changed, 842 insertions(+), 1 deletion(-) create mode 100644 tutorials/distributed-ml/tf-scaling-test-jube/README.md create mode 100644 tutorials/distributed-ml/tf-scaling-test-jube/bench_plot.ipynb create mode 100644 tutorials/distributed-ml/tf-scaling-test-jube/general_jobsys.xml create mode 100644 tutorials/distributed-ml/tf-scaling-test-jube/jube_ddp.sh create mode 100644 tutorials/distributed-ml/tf-scaling-test-jube/train.py create mode 100644 tutorials/distributed-ml/tf-tutorial-1-imagenet/tfmirrored_slurm.sh create mode 100644 tutorials/distributed-ml/tf-tutorial-1-imagenet/train.py diff --git a/.vscode/settings.json b/.vscode/settings.json index 08d06d81..32e0cccb 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -59,5 +59,8 @@ "tests" ], "python.testing.unittestEnabled": false, - "python.testing.pytestEnabled": true + "python.testing.pytestEnabled": true, + "python.analysis.extraPaths": [ + "./src/itwinai" + ] } \ No newline at end of file diff --git a/tutorials/distributed-ml/tf-scaling-test-jube/README.md b/tutorials/distributed-ml/tf-scaling-test-jube/README.md new file mode 100644 index 00000000..bc2cab1c --- /dev/null +++ b/tutorials/distributed-ml/tf-scaling-test-jube/README.md @@ -0,0 +1,44 @@ +# Benchmarking tutorial using JUBE + +Benchmarking of itwinai can also be performed with the JUBE Benchmarking Environment from JSC. +The JUBE benchmarking tool is already setup in the environment files provided under `env-files`. + +## Source the environment + +Find the location of your environment file along with the module load commands, such as: + +```bash +ml Stages/2024 GCC/12.3.0 OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py CMake cuDNN/8.9.5.29-CUDA-12 +source envAI_hdfml/bin/activate +``` + +## Run benchmark + +The benchmarks are defined in the `general_jobsys.xml` file. +One can specify the configurations in terms of parameters such as the number of nodes. +The benchmark can be simply launched with the command: + +```bash +jube run general_jobsys.xml +``` + +## Monitor status of benchmark run + +The status of the run can be monitored with: + +```bash +jube continue bench_run --id last +``` + +## Check results of the benchmark run + +The results can be viewed with: + +```bash +jube result -a bench_run --id last +``` + +This will create `result-csv.dat` file in the `results` folder. + +The scaling and efficiency plots can be generated with the `bench_plot.ipynb` file +which takes the `result-csv.dat` file as input. diff --git a/tutorials/distributed-ml/tf-scaling-test-jube/bench_plot.ipynb b/tutorials/distributed-ml/tf-scaling-test-jube/bench_plot.ipynb new file mode 100644 index 00000000..fda6cd13 --- /dev/null +++ b/tutorials/distributed-ml/tf-scaling-test-jube/bench_plot.ipynb @@ -0,0 +1,170 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Plot benchmark results of itwinai" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import os, pandas as pd, matplotlib.pyplot as plt, numpy as np\n", + "%matplotlib inline\n", + "pd.options.display.max_columns = None" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "plt.rcParams['figure.figsize'] = [12, 6]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "df = pd.read_csv('result-csv.dat',header=0)\n", + "df.rename(columns=lambda x: x.split('[')[0], inplace=True)\n", + "\n", + "# gpus\n", + "df[\"NGPUs\"] = df[\"Nnodes\"]*4\n", + "\n", + "# speedup\n", + "df[\"Speedup - ideal\"] = df[\"Nnodes\"].astype(float)\n", + "df[\"Speedup\"] = df[\"Naet\"].iloc[0] / df[\"Naet\"]\n", + "\n", + "# efficiency\n", + "df[\"Threadscaled Sim. Time / s\"] = df[\"Naet\"] * df[\"Nnodes\"] * df[\"Nworkers\"]\n", + "df[\"Efficiency\"] = df[\"Threadscaled Sim. Time / s\"].iloc[0] / df[\"Threadscaled Sim. Time / s\"]\n", + "df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Overview" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "ax = df.pivot_table(index=[\"NGPUs\"], columns=[\"Nworkers\"], values=\"Naet\").plot(kind=\"bar\", title=\"Runtime behaviour\");\n", + "ax.set_ylabel(\"Epoch Time / s\");\n", + "ax_abs = ax\n", + "for p in ax.patches:\n", + " ax.annotate(\"{:.2f} s\".format(p.get_height()), (p.get_x() + p.get_width()/1.33, p.get_height() * 1.01), \\\n", + " color=\"dimgray\", horizontalalignment=\"center\", verticalalignment=\"bottom\", rotation=\"vertical\")\n", + "pass" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Scaling Behaviour" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "ax = df.pivot_table(index=[\"NGPUs\"], columns=[\"Nworkers\"], values=\"Speedup\").plot(style=\"*-\", \\\n", + " loglog=False, title=\"Scaling behaviour\", color=\"r\", legend=False);\n", + "ax.plot(df[\"NGPUs\"].values,df[\"Speedup - ideal\"].values,ls='dashed',lw=1.0,c='k',label=\"ideal\")\n", + "\n", + "ax.legend(ncol=1, title=\"(Nworkers)\")\n", + "ax.set_xticks(df[\"NGPUs\"].values)\n", + "ax.set_yticks(df[\"Speedup - ideal\"].values)\n", + "ax.set_ylabel(r'Speedup')\n", + "ax.set_xlim((0,np.amax(df[\"NGPUs\"].values+1)))\n", + "ax.set_ylim((0,np.amax(df[\"Speedup - ideal\"].values+1)))\n", + "\n", + "pass" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Runtime Efficiencies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "ax = df.pivot_table(index=[\"NGPUs\"], columns=[\"Nworkers\"], values=\"Efficiency\").plot(kind=\"bar\", \\\n", + " legend=False, title=\"Runtime efficiency\")\n", + "ax.legend(ncol=1, title=\"(Ntasks, Ncells)\",loc=4)\n", + "ax.set_ylabel(\"Efficiency\");\n", + "for p, abs in zip(ax.patches, ax_abs.patches):\n", + " ax.annotate(\"{:.2f}\".format(p.get_height()), (p.get_x() + p.get_width()/1.33, p.get_height() * 1.01), \\\n", + " color=\"dimgray\", horizontalalignment=\"center\", verticalalignment=\"bottom\", rotation=\"vertical\")\n", + " ax.annotate(\"Abs: {:.1f} s\".format(abs.get_height()), (p.get_x() + p.get_width()/1.33, p.get_height() * 0.95), \\\n", + " color=\"white\", horizontalalignment=\"center\", verticalalignment=\"top\", rotation=\"vertical\")\n", + "ax.plot(df[\"NGPUs\"].values-8,df[\"Speedup - ideal\"].values*0+1,ls='dashed',lw=1.0,c='r',label=\"ideal\")\n", + "pass" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# EOF" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.8" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/tutorials/distributed-ml/tf-scaling-test-jube/general_jobsys.xml b/tutorials/distributed-ml/tf-scaling-test-jube/general_jobsys.xml new file mode 100644 index 00000000..6f981f57 --- /dev/null +++ b/tutorials/distributed-ml/tf-scaling-test-jube/general_jobsys.xml @@ -0,0 +1,140 @@ + + + + General benchmark script + + + + + 1,2,4,8 + + 8 + + train.py + + + + + if [ -f /etc/FZJ/systemname ]; then cat /etc/FZJ/systemname | tr -d "\n"; else uname -n | head -c 3; fi + sbatch + $iterNO + $iterNW + ready + jube_ddp.sh + + { "hdfml": 4, + }["${systemname}"] + + intertwin + + 04:00:00 + + { "hdfml": "batch", + }["${systemname}"] + + + 00:10:00 + + { "hdfml": "batch", + }["${systemname}"] + + + + + { + "hdfml": "ml ml Stages/2024 GCC/12.3.0 OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py CMake cuDNN/8.9.5.29-CUDA-12", + }["${systemname}"] + + source /p/project/intertwin/rakesh/repo_push/itwinai/envAItf_hdfml/bin/activate + { + "hdfml": "export CUDA_VISIBLE_DEVICES=0,1,2,3" + }["${systemname}"] + + + + + + $job_file + $script + + + + + + + + + + + + + + + + + + + + + paramset + executeset + envirset + files,sub_job + echo "nID: $jube_wp_id" + + $submit_cmd $job_file + + + + + + ${jube_wp_id} + ${nodes} + ${nnw} + \s*TIMER: total epoch time:\s+$jube_pat_wrd\s* + \s*TIMER: average epoch time:\s+$jube_pat_wrd\s* + ${avgEpochT} + + + + + pattern + + stdout + job.out + + + + + + analyse + + ID + Nnodes + Nworkers + calcTime + avgEpochT + Naet + memoryGPU +
+
+ + + + analyse + + ID + Nnodes + Nworkers + calcTime + avgEpochT + Naet + memoryGPU +
+
+ +
+
+ + + diff --git a/tutorials/distributed-ml/tf-scaling-test-jube/jube_ddp.sh b/tutorials/distributed-ml/tf-scaling-test-jube/jube_ddp.sh new file mode 100644 index 00000000..adafae78 --- /dev/null +++ b/tutorials/distributed-ml/tf-scaling-test-jube/jube_ddp.sh @@ -0,0 +1,72 @@ +#!/bin/bash + +# general configuration of the job +#SBATCH --job-name=JUBE_DDP +#SBATCH --account=#ACC# +#SBATCH --mail-user= +#SBATCH --mail-type=ALL +#SBATCH --output=job.out +#SBATCH --error=job.err +#SBATCH --time=#TIMELIM# + +# configure node and process count on the CM +#SBATCH --partition=#QUEUE# +#SBATCH --nodes=#NODES# +#SBATCH --cpus-per-task=#NW# +#SBATCH --gpus-per-node=#NGPU# +#SBATCH --exclusive + +# gres options have to be disabled for deepv +#SBATCH --gres=gpu:4 + +set -x +unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY + +# set modules +ml --force purge +ml Stages/2024 GCC/12.3.0 OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py CMake cuDNN/8.9.5.29-CUDA-12 + +# set env +source /p/project/intertwin/rakesh/repo_push/itwinai/envAItf_hdfml/bin/activate + +# Using legacy (2.16) version of Keras +# Latest version with TF (2.16) installs Keras 3.3 +# which returns an error for multi-node execution +export TF_USE_LEGACY_KERAS=1 + +# sleep a sec +sleep 1 + +# job info +echo "DEBUG: TIME: $(date)" +echo "DEBUG: EXECUTE: $EXEC" +echo "DEBUG: SLURM_SUBMIT_DIR: $SLURM_SUBMIT_DIR" +echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID" +echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST" +echo "DEBUG: SLURM_NNODES: $SLURM_NNODES" +echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS" +echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE" +echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST" +echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME" +echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES" +echo "DEBUG: SLURM_NODELIST: $SLURM_NODELIST" +echo + +# set comm +export CUDA_VISIBLE_DEVICES="0,1,2,3" +export OMP_NUM_THREADS=1 +if [ "$SLURM_CPUS_PER_TASK" -gt 0 ] ; then + export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK +fi + +dataDir='/p/scratch/intertwin/datasets/imagenet/' + +COMMAND="train.py" + +EXEC="$COMMAND \ + --data_dir $dataDir" + +srun python -u $EXEC + + +#eof diff --git a/tutorials/distributed-ml/tf-scaling-test-jube/train.py b/tutorials/distributed-ml/tf-scaling-test-jube/train.py new file mode 100644 index 00000000..4bd4ff58 --- /dev/null +++ b/tutorials/distributed-ml/tf-scaling-test-jube/train.py @@ -0,0 +1,171 @@ +""" + Show how to use TensorFlow MultiWorkerMirroredStrategy on itwinai. + for an Imagenet dataset + with SLURM: + >>> sbatch tfmirrored_slurm.sh + + """ +import argparse +import sys +from timeit import default_timer as timer + +import tensorflow as tf +from tensorflow import keras +from tensorflow.keras.layers import Dense, GlobalAveragePooling2D +from tensorflow.keras.models import Model + +from itwinai.tensorflow.distributed import get_strategy + + +def parse_args(): + """ + Parse args + """ + parser = argparse.ArgumentParser(description='TensorFlow ImageNet') + + parser.add_argument( + "--strategy", "-s", type=str, + choices=['mirrored'], + default='mirrored' + ) + parser.add_argument( + "--data_dir", type=str, + default='./' + ) + parser.add_argument( + "--batch_size", type=int, + default=128 + ) + parser.add_argument( + "--epochs", type=int, + default=3 + ) + + args = parser.parse_args() + return args + + +def deserialization_fn(serialized_fn): + """Imagenet data processing + + Args: + serialized_example (Any): Input function + + Returns: + Any: Images and associated labels + """ + parsed_example = tf.io.parse_single_example( + serialized_fn, + features={ + 'image/encoded': tf.io.FixedLenFeature([], tf.string), + 'image/class/label': tf.io.FixedLenFeature([], tf.int64), + } + ) + image = tf.image.decode_jpeg(parsed_example['image/encoded'], channels=3) + image = tf.image.resize(image, (224, 224)) + label = tf.cast(parsed_example['image/class/label'], tf.int64) - 1 + return image, label + + +def tf_records_loader(files_path, shuffle=False): + """tf_records dataset reader + + Args: + files_path (String): Path to location of data + shuffle (bool, optional): If dataset should be shuffled. + Defaults to False. + + Returns: + tf.data.Dataset: Returns dataset to be trained + """ + datasets = tf.data.Dataset.from_tensor_slices(files_path) + datasets = datasets.shuffle(len(files_path)) if shuffle else datasets + datasets = datasets.flat_map(tf.data.TFRecordDataset) + datasets = datasets.map( + deserialization_fn, num_parallel_calls=tf.data.AUTOTUNE) + return datasets + + +def main(): + args = parse_args() + + input_shape = (224, 224, 3) + num_classes = 1000 + + if args.strategy == 'mirrored': + strategy = get_strategy()[0] + else: + raise NotImplementedError( + f"Strategy {args.strategy} is not recognized/implemented.") + + with strategy.scope(): + base_model = keras.applications.ResNet50( + weights=None, + input_shape=input_shape, + include_top=False, + ) + + x = base_model.output + x = GlobalAveragePooling2D()(x) + x = Dense(1024, activation='relu')(x) + predictions = Dense(num_classes, activation='softmax')(x) + + model = Model(inputs=base_model.input, outputs=predictions) + + model.compile(loss=keras.losses.sparse_categorical_crossentropy, + optimizer=keras.optimizers.Adam(), + metrics=['accuracy'] + ) + + # scale batch size with number of workers + batch_size = args.batch_size * get_strategy()[1] + + dir_imagenet = args.data_dir+'imagenet-1K-tfrecords' + train_shard_suffix = 'train-*-of-01024' + test_shard_suffix = 'validation-*-of-00128' + + train_set_path = sorted( + tf.io.gfile.glob(dir_imagenet + f'/{train_shard_suffix}') + ) + test_set_path = sorted( + tf.io.gfile.glob(dir_imagenet + f'/{test_shard_suffix}') + ) + + train_dataset = tf_records_loader(train_set_path, shuffle=True) + test_dataset = tf_records_loader(test_set_path) + + train_dataset = train_dataset.batch( + batch_size).prefetch(tf.data.experimental.AUTOTUNE) + test_dataset = test_dataset.batch( + batch_size).prefetch(tf.data.experimental.AUTOTUNE) + + # distribute datasets among mirrored replicas + dist_train = strategy.experimental_distribute_dataset( + train_dataset + ) + dist_test = strategy.experimental_distribute_dataset( + test_dataset + ) + + # TODO: add callbacks to evaluate per epoch time + et = timer() + + # trains the model + model.fit(dist_train, epochs=args.epochs, steps_per_epoch=2000, verbose=10) + + print('TIMER: total epoch time:', + timer() - et, ' s') + print('TIMER: average epoch time:', + (timer() - et) / (args.epochs), ' s') + + test_scores = model.evaluate(dist_test, steps=100, verbose=5) + + print('Test loss:', test_scores[0]) + print('Test accuracy:', test_scores[1]) + + +if __name__ == "__main__": + main() + sys.exit() + +# eof diff --git a/tutorials/distributed-ml/tf-tutorial-1-imagenet/tfmirrored_slurm.sh b/tutorials/distributed-ml/tf-tutorial-1-imagenet/tfmirrored_slurm.sh new file mode 100644 index 00000000..7c886f14 --- /dev/null +++ b/tutorials/distributed-ml/tf-tutorial-1-imagenet/tfmirrored_slurm.sh @@ -0,0 +1,70 @@ +#!/bin/bash + +# general configuration of the job +#SBATCH --job-name=TFTest +#SBATCH --account=intertwin +#SBATCH --mail-user= +#SBATCH --mail-type=ALL +#SBATCH --output=job.out +#SBATCH --error=job.err +#SBATCH --time=01:00:00 + +# configure node and process count on the CM +#SBATCH --partition=batch +#SBATCH --nodes=4 +#SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task=32 +#SBATCH --gpus-per-node=4 +#SBATCH --exclusive + +# gres options have to be disabled for deepv +#SBATCH --gres=gpu:4 + +set -x +unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY + +# set modules +ml --force purge +ml Stages/2024 GCC/12.3.0 OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py CMake cuDNN/8.9.5.29-CUDA-12 + +# set env +source /p/project/intertwin/rakesh/repo_push/itwinai/envAItf_hdfml/bin/activate + +# Using legacy (2.16) version of Keras +# Latest version with TF (2.16) installs Keras 3.3 +# which returns an error for multi-node execution +export TF_USE_LEGACY_KERAS=1 + +# sleep a sec +sleep 1 + +# job info +echo "DEBUG: TIME: $(date)" +echo "DEBUG: EXECUTE: $EXEC" +echo "DEBUG: SLURM_SUBMIT_DIR: $SLURM_SUBMIT_DIR" +echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID" +echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST" +echo "DEBUG: SLURM_NNODES: $SLURM_NNODES" +echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS" +echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE" +echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST" +echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME" +echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES" +echo "DEBUG: SLURM_NODELIST: $SLURM_NODELIST" +echo + +# set comm +export CUDA_VISIBLE_DEVICES="0,1,2,3" +export OMP_NUM_THREADS=1 +if [ "$SLURM_CPUS_PER_TASK" -gt 0 ] ; then + export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK +fi + +dataDir='/p/scratch/intertwin/datasets/imagenet/' + +COMMAND="train.py" + +EXEC="$COMMAND \ + --data_dir $dataDir" + +srun python -u $EXEC diff --git a/tutorials/distributed-ml/tf-tutorial-1-imagenet/train.py b/tutorials/distributed-ml/tf-tutorial-1-imagenet/train.py new file mode 100644 index 00000000..4bd4ff58 --- /dev/null +++ b/tutorials/distributed-ml/tf-tutorial-1-imagenet/train.py @@ -0,0 +1,171 @@ +""" + Show how to use TensorFlow MultiWorkerMirroredStrategy on itwinai. + for an Imagenet dataset + with SLURM: + >>> sbatch tfmirrored_slurm.sh + + """ +import argparse +import sys +from timeit import default_timer as timer + +import tensorflow as tf +from tensorflow import keras +from tensorflow.keras.layers import Dense, GlobalAveragePooling2D +from tensorflow.keras.models import Model + +from itwinai.tensorflow.distributed import get_strategy + + +def parse_args(): + """ + Parse args + """ + parser = argparse.ArgumentParser(description='TensorFlow ImageNet') + + parser.add_argument( + "--strategy", "-s", type=str, + choices=['mirrored'], + default='mirrored' + ) + parser.add_argument( + "--data_dir", type=str, + default='./' + ) + parser.add_argument( + "--batch_size", type=int, + default=128 + ) + parser.add_argument( + "--epochs", type=int, + default=3 + ) + + args = parser.parse_args() + return args + + +def deserialization_fn(serialized_fn): + """Imagenet data processing + + Args: + serialized_example (Any): Input function + + Returns: + Any: Images and associated labels + """ + parsed_example = tf.io.parse_single_example( + serialized_fn, + features={ + 'image/encoded': tf.io.FixedLenFeature([], tf.string), + 'image/class/label': tf.io.FixedLenFeature([], tf.int64), + } + ) + image = tf.image.decode_jpeg(parsed_example['image/encoded'], channels=3) + image = tf.image.resize(image, (224, 224)) + label = tf.cast(parsed_example['image/class/label'], tf.int64) - 1 + return image, label + + +def tf_records_loader(files_path, shuffle=False): + """tf_records dataset reader + + Args: + files_path (String): Path to location of data + shuffle (bool, optional): If dataset should be shuffled. + Defaults to False. + + Returns: + tf.data.Dataset: Returns dataset to be trained + """ + datasets = tf.data.Dataset.from_tensor_slices(files_path) + datasets = datasets.shuffle(len(files_path)) if shuffle else datasets + datasets = datasets.flat_map(tf.data.TFRecordDataset) + datasets = datasets.map( + deserialization_fn, num_parallel_calls=tf.data.AUTOTUNE) + return datasets + + +def main(): + args = parse_args() + + input_shape = (224, 224, 3) + num_classes = 1000 + + if args.strategy == 'mirrored': + strategy = get_strategy()[0] + else: + raise NotImplementedError( + f"Strategy {args.strategy} is not recognized/implemented.") + + with strategy.scope(): + base_model = keras.applications.ResNet50( + weights=None, + input_shape=input_shape, + include_top=False, + ) + + x = base_model.output + x = GlobalAveragePooling2D()(x) + x = Dense(1024, activation='relu')(x) + predictions = Dense(num_classes, activation='softmax')(x) + + model = Model(inputs=base_model.input, outputs=predictions) + + model.compile(loss=keras.losses.sparse_categorical_crossentropy, + optimizer=keras.optimizers.Adam(), + metrics=['accuracy'] + ) + + # scale batch size with number of workers + batch_size = args.batch_size * get_strategy()[1] + + dir_imagenet = args.data_dir+'imagenet-1K-tfrecords' + train_shard_suffix = 'train-*-of-01024' + test_shard_suffix = 'validation-*-of-00128' + + train_set_path = sorted( + tf.io.gfile.glob(dir_imagenet + f'/{train_shard_suffix}') + ) + test_set_path = sorted( + tf.io.gfile.glob(dir_imagenet + f'/{test_shard_suffix}') + ) + + train_dataset = tf_records_loader(train_set_path, shuffle=True) + test_dataset = tf_records_loader(test_set_path) + + train_dataset = train_dataset.batch( + batch_size).prefetch(tf.data.experimental.AUTOTUNE) + test_dataset = test_dataset.batch( + batch_size).prefetch(tf.data.experimental.AUTOTUNE) + + # distribute datasets among mirrored replicas + dist_train = strategy.experimental_distribute_dataset( + train_dataset + ) + dist_test = strategy.experimental_distribute_dataset( + test_dataset + ) + + # TODO: add callbacks to evaluate per epoch time + et = timer() + + # trains the model + model.fit(dist_train, epochs=args.epochs, steps_per_epoch=2000, verbose=10) + + print('TIMER: total epoch time:', + timer() - et, ' s') + print('TIMER: average epoch time:', + (timer() - et) / (args.epochs), ' s') + + test_scores = model.evaluate(dist_test, steps=100, verbose=5) + + print('Test loss:', test_scores[0]) + print('Test accuracy:', test_scores[1]) + + +if __name__ == "__main__": + main() + sys.exit() + +# eof