diff --git a/returnn/training.py b/returnn/training.py index edf780c3..1ae3f0b7 100644 --- a/returnn/training.py +++ b/returnn/training.py @@ -118,6 +118,8 @@ class ReturnnTrainingJob(Job): function of Returnn not all checkpoints are actually available. """ + __sis_hash_exclude__ = {"distributed_launch_cmd": "mpirun"} + def __init__( self, returnn_config: ReturnnConfig, @@ -130,6 +132,7 @@ def __init__( time_rqmt: float = 4, mem_rqmt: float = 4, cpu_rqmt: int = 2, + distributed_launch_cmd: str = "mpirun", horovod_num_processes: Optional[int] = None, multi_node_slots: Optional[int] = None, returnn_python_exe: Optional[tk.Path] = None, @@ -149,6 +152,9 @@ def __init__( :param time_rqmt: :param mem_rqmt: :param cpu_rqmt: + :param distributed_launch_cmd: the command used to launch training jobs, only used if horovod_num_processes is not None + Possible values: "mpirun": use mpirun, c.f. https://www.open-mpi.org/doc/v4.0/man1/mpirun.1.php + "torchrun": use torchrun, c.f. https://pytorch.org/docs/stable/elastic/run.html :param horovod_num_processes: If used without multi_node_slots, then single node, otherwise multi node. :param multi_node_slots: multi-node multi-GPU training. See Sisyphus rqmt documentation. Currently only with Horovod, @@ -158,12 +164,14 @@ def __init__( :param returnn_root: file path to the RETURNN repository root folder """ assert isinstance(returnn_config, ReturnnConfig) + assert distributed_launch_cmd in ["mpirun", "torchrun"] self.check_blacklisted_parameters(returnn_config) kwargs = locals() del kwargs["self"] self.returnn_python_exe = util.get_returnn_python_exe(returnn_python_exe) self.returnn_root = util.get_returnn_root(returnn_root) + self.distributed_launch_cmd = distributed_launch_cmd self.horovod_num_processes = horovod_num_processes self.multi_node_slots = multi_node_slots self.returnn_config = ReturnnTrainingJob.create_returnn_config(**kwargs) @@ -240,25 +248,35 @@ def _get_run_cmd(self): ] if self.horovod_num_processes: - # Normally, if the engine (e.g. SGE or Slurm) is configured correctly, - # it automatically provides the information on multiple nodes to mpirun, - # so it is not needed to explicitly pass on any hostnames here. - run_cmd = [ - "mpirun", - "-np", - str(self.horovod_num_processes), - "-bind-to", - "none", - "-map-by", - "slot", - "-mca", - "pml", - "ob1", - "-mca", - "btl", - "^openib", - "--report-bindings", - ] + run_cmd + if self.distributed_launch_cmd == "torchrun": + # use torchrun to lauch DDP training when the backend is torch + run_cmd = [ + "torchrun", + f"--nnodes={self.multi_node_slots or 1}", + f"--nproc-per-node={self.horovod_num_processes}", + ] + run_cmd[1:] + elif self.distributed_launch_cmd == "mpirun": + # Normally, if the engine (e.g. SGE or Slurm) is configured correctly, + # it automatically provides the information on multiple nodes to mpirun, + # so it is not needed to explicitly pass on any hostnames here. + run_cmd = [ + "mpirun", + "-np", + str(self.horovod_num_processes), + "-bind-to", + "none", + "-map-by", + "slot", + "-mca", + "pml", + "ob1", + "-mca", + "btl", + "^openib", + "--report-bindings", + ] + run_cmd + else: + raise ValueError(f"invalid distributed_launch_cmd {self.distributed_launch_cmd!r}") return run_cmd