From d3877715605d8fc28bce29fc2ec79d182022e7a5 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Thu, 24 Oct 2024 09:56:41 -0400 Subject: [PATCH 1/9] Adapt SLURM engine for multi-node jobs --- ..._utility_for_resource_management_engine.py | 36 ++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/sisyphus/simple_linux_utility_for_resource_management_engine.py b/sisyphus/simple_linux_utility_for_resource_management_engine.py index ff0e71a..01f4b47 100644 --- a/sisyphus/simple_linux_utility_for_resource_management_engine.py +++ b/sisyphus/simple_linux_utility_for_resource_management_engine.py @@ -173,8 +173,9 @@ def options(self, rqmt): out.append("--time=%s" % task_time) out.append("--export=all") - if rqmt.get("multi_node_slots", None): + if rqmt.get("multi_node_slots", 1) > 1: out.append("--ntasks=%s" % rqmt["multi_node_slots"]) + out.append("--nodes=%s" % rqmt["multi_node_slots"]) sbatch_args = rqmt.get("sbatch_args", []) if isinstance(sbatch_args, str): @@ -232,11 +233,15 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, start_id, end_id, :param int step_size: """ name = self.process_task_name(name) - sbatch_call = ["sbatch", "-J", name, "-o", logpath + "/%x.%A.%a", "--mail-type=None"] + out_log_file = logpath + "/%x.%A.%t.%a" + sbatch_call = ["sbatch", "-J", name, "--mail-type=None"] sbatch_call += self.options(rqmt) + sbatch_call += [ + "-a", + f"{start_id}-{end_id}:{step_size}" + f"--wrap=srun -o {out_log_file} {' '.join(call)}", + ] - sbatch_call += ["-a", "%i-%i:%i" % (start_id, end_id, step_size)] - sbatch_call += ["--wrap=%s" % " ".join(call)] while True: try: out, err, retval = self.system_call(sbatch_call) @@ -393,19 +398,34 @@ def get_default_rqmt(self, task): def init_worker(self, task): # setup log file by linking to engine logfile - task_id = self.get_task_id(None) - logpath = os.path.relpath(task.path(gs.JOB_LOG, task_id)) + + # Naming ambiguity: sis "tasks" are what SLURM calls array jobs. + # + # SLURM tasks represent jobs that span multiple nodes at the same time + # (e.g. multi-node multi-GPU trainings consist of one SLURM task per node). + slurm_num_tasks = int( + next(filter(None, (os.getenv(var, None) for var in ["SLURM_NTASKS", "SLURM_NPROCS"])), "1") + ) + slurm_task_id = int(os.getenv("SLURM_PROCID", "0")) + + array_task_id = self.get_task_id(None) + # keep backwards compatibility: only change output file name for multi-SLURM-task jobs + log_suffix = array_task_id if slurm_num_tasks <= 1 else f"{array_task_id}.{slurm_task_id}" + logpath = os.path.relpath(task.path(gs.JOB_LOG, log_suffix)) if os.path.isfile(logpath): os.unlink(logpath) + job_ids = (os.getenv(name, None) for name in ["SLURM_JOB_ID", "SLURM_JOBID", "SLURM_ARRAY_JOB_ID"]) engine_logpath = ( os.path.dirname(logpath) + "/engine/" + os.getenv("SLURM_JOB_NAME") + "." - + os.getenv("SLURM_ARRAY_JOB_ID") + + next(filter(None, job_ids), "0") + + "." + + str(slurm_task_id) + "." - + os.getenv("SLURM_ARRAY_TASK_ID") + + os.getenv("SLURM_ARRAY_TASK_ID", "1") ) try: if os.path.isfile(engine_logpath): From 808eee417a6ac434918cb8f50f7890fc729f393f Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Thu, 24 Oct 2024 10:17:42 -0400 Subject: [PATCH 2/9] fix missing comma --- sisyphus/simple_linux_utility_for_resource_management_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sisyphus/simple_linux_utility_for_resource_management_engine.py b/sisyphus/simple_linux_utility_for_resource_management_engine.py index 01f4b47..9391faa 100644 --- a/sisyphus/simple_linux_utility_for_resource_management_engine.py +++ b/sisyphus/simple_linux_utility_for_resource_management_engine.py @@ -238,7 +238,7 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, start_id, end_id, sbatch_call += self.options(rqmt) sbatch_call += [ "-a", - f"{start_id}-{end_id}:{step_size}" + f"{start_id}-{end_id}:{step_size}", f"--wrap=srun -o {out_log_file} {' '.join(call)}", ] From 398353a1f81da79b14c50eb6e4aa8d0d7134c4aa Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Thu, 24 Oct 2024 10:18:21 -0400 Subject: [PATCH 3/9] group call construction by purpose --- .../simple_linux_utility_for_resource_management_engine.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sisyphus/simple_linux_utility_for_resource_management_engine.py b/sisyphus/simple_linux_utility_for_resource_management_engine.py index 9391faa..5ed28b2 100644 --- a/sisyphus/simple_linux_utility_for_resource_management_engine.py +++ b/sisyphus/simple_linux_utility_for_resource_management_engine.py @@ -236,11 +236,8 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, start_id, end_id, out_log_file = logpath + "/%x.%A.%t.%a" sbatch_call = ["sbatch", "-J", name, "--mail-type=None"] sbatch_call += self.options(rqmt) - sbatch_call += [ - "-a", - f"{start_id}-{end_id}:{step_size}", - f"--wrap=srun -o {out_log_file} {' '.join(call)}", - ] + sbatch_call += ["-a", f"{start_id}-{end_id}:{step_size}"] + sbatch_call += [f"--wrap=srun -o {out_log_file} {' '.join(call)}"] while True: try: From 496e73e61a2b3addec1b2bbe4168fa8134c7f537 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Thu, 24 Oct 2024 10:21:10 -0400 Subject: [PATCH 4/9] fix forced integer formatting --- sisyphus/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sisyphus/job.py b/sisyphus/job.py index 5243bdb..516b9d9 100644 --- a/sisyphus/job.py +++ b/sisyphus/job.py @@ -398,7 +398,7 @@ def _sis_path(self, path_type=None, task_id=None, abspath=False): # Add task id as suffix if task_id is not None: - path += ".%i" % task_id + path += f".{task_id}" if abspath and not os.path.isabs(path): path = os.path.join(gs.BASE_DIR, path) From aee2003183e84b1347debf4a2f658485f6c716f1 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Fri, 25 Oct 2024 05:08:38 -0400 Subject: [PATCH 5/9] also provide log path for sbatch call otherwise it will put an empty log file into your sis dir --- sisyphus/simple_linux_utility_for_resource_management_engine.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sisyphus/simple_linux_utility_for_resource_management_engine.py b/sisyphus/simple_linux_utility_for_resource_management_engine.py index 5ed28b2..a247cd7 100644 --- a/sisyphus/simple_linux_utility_for_resource_management_engine.py +++ b/sisyphus/simple_linux_utility_for_resource_management_engine.py @@ -236,6 +236,7 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, start_id, end_id, out_log_file = logpath + "/%x.%A.%t.%a" sbatch_call = ["sbatch", "-J", name, "--mail-type=None"] sbatch_call += self.options(rqmt) + sbatch_call += ["-o", f"{out_log_file}.batch"] sbatch_call += ["-a", f"{start_id}-{end_id}:{step_size}"] sbatch_call += [f"--wrap=srun -o {out_log_file} {' '.join(call)}"] From 6d29c5e05cbfa0824b5f7b87b7b4189442af0d4c Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Fri, 25 Oct 2024 05:47:37 -0400 Subject: [PATCH 6/9] symlink log file from previous format if required --- ..._utility_for_resource_management_engine.py | 57 +++++++++++++------ 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/sisyphus/simple_linux_utility_for_resource_management_engine.py b/sisyphus/simple_linux_utility_for_resource_management_engine.py index a247cd7..6ef2cf9 100644 --- a/sisyphus/simple_linux_utility_for_resource_management_engine.py +++ b/sisyphus/simple_linux_utility_for_resource_management_engine.py @@ -413,26 +413,49 @@ def init_worker(self, task): if os.path.isfile(logpath): os.unlink(logpath) - job_ids = (os.getenv(name, None) for name in ["SLURM_JOB_ID", "SLURM_JOBID", "SLURM_ARRAY_JOB_ID"]) - engine_logpath = ( - os.path.dirname(logpath) - + "/engine/" - + os.getenv("SLURM_JOB_NAME") - + "." - + next(filter(None, job_ids), "0") - + "." - + str(slurm_task_id) - + "." - + os.getenv("SLURM_ARRAY_TASK_ID", "1") + job_id = next( + filter(None, (os.getenv(name, None) for name in ["SLURM_JOB_ID", "SLURM_JOBID", "SLURM_ARRAY_JOB_ID"])), "0" ) - try: - if os.path.isfile(engine_logpath): + has_linked_logfile = False + engine_logpath_candidates = [ + ( + os.path.dirname(logpath) + + "/engine/" + + os.getenv("SLURM_JOB_NAME") + + "." + + job_id + + "." + + str(slurm_task_id) + + "." + + os.getenv("SLURM_ARRAY_TASK_ID", "1") + ), + ( + os.path.dirname(logpath) + + "/engine/" + + os.getenv("SLURM_JOB_NAME") + + "." + + job_id + + "." + + os.getenv("SLURM_ARRAY_TASK_ID", "1") + ), + ] + for engine_logpath in engine_logpath_candidates: + if not os.path.isfile(engine_logpath): + continue + try: os.link(engine_logpath, logpath) - else: - logging.warning("Could not find engine logfile: %s Create soft link anyway." % engine_logpath) + has_linked_logfile = True + break + except FileExistsError: + pass + + if not has_linked_logfile: + engine_logpath = engine_logpath_candidates[0] + logging.warning("Could not find engine logfile: %s Create soft link anyway." % engine_logpath) + try: os.symlink(os.path.relpath(engine_logpath, os.path.dirname(logpath)), logpath) - except FileExistsError: - pass + except FileExistsError: + pass def get_logpath(self, logpath_base, task_name, task_id): """Returns log file for the currently running task""" From 909ae4c9ddcaf9c4a2e4d44b8725e7b8d7e43590 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Thu, 31 Oct 2024 06:01:46 -0400 Subject: [PATCH 7/9] only create multi-slurm-task path if needed --- ..._utility_for_resource_management_engine.py | 62 +++++++------------ 1 file changed, 22 insertions(+), 40 deletions(-) diff --git a/sisyphus/simple_linux_utility_for_resource_management_engine.py b/sisyphus/simple_linux_utility_for_resource_management_engine.py index 6ef2cf9..d5a5e29 100644 --- a/sisyphus/simple_linux_utility_for_resource_management_engine.py +++ b/sisyphus/simple_linux_utility_for_resource_management_engine.py @@ -233,7 +233,9 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, start_id, end_id, :param int step_size: """ name = self.process_task_name(name) - out_log_file = logpath + "/%x.%A.%t.%a" + out_log_file = logpath + "/%x.%A.%a" + if rqmt.get("multi_node_slots", 1) > 1: + out_log_file += ".%t" sbatch_call = ["sbatch", "-J", name, "--mail-type=None"] sbatch_call += self.options(rqmt) sbatch_call += ["-o", f"{out_log_file}.batch"] @@ -405,8 +407,8 @@ def init_worker(self, task): next(filter(None, (os.getenv(var, None) for var in ["SLURM_NTASKS", "SLURM_NPROCS"])), "1") ) slurm_task_id = int(os.getenv("SLURM_PROCID", "0")) - array_task_id = self.get_task_id(None) + # keep backwards compatibility: only change output file name for multi-SLURM-task jobs log_suffix = array_task_id if slurm_num_tasks <= 1 else f"{array_task_id}.{slurm_task_id}" logpath = os.path.relpath(task.path(gs.JOB_LOG, log_suffix)) @@ -416,46 +418,26 @@ def init_worker(self, task): job_id = next( filter(None, (os.getenv(name, None) for name in ["SLURM_JOB_ID", "SLURM_JOBID", "SLURM_ARRAY_JOB_ID"])), "0" ) - has_linked_logfile = False - engine_logpath_candidates = [ - ( - os.path.dirname(logpath) - + "/engine/" - + os.getenv("SLURM_JOB_NAME") - + "." - + job_id - + "." - + str(slurm_task_id) - + "." - + os.getenv("SLURM_ARRAY_TASK_ID", "1") - ), - ( - os.path.dirname(logpath) - + "/engine/" - + os.getenv("SLURM_JOB_NAME") - + "." - + job_id - + "." - + os.getenv("SLURM_ARRAY_TASK_ID", "1") - ), - ] - for engine_logpath in engine_logpath_candidates: - if not os.path.isfile(engine_logpath): - continue - try: + engine_logpath = ( + os.path.dirname(logpath) + + "/engine/" + + os.getenv("SLURM_JOB_NAME") + + "." + + job_id + + "." + + os.getenv("SLURM_ARRAY_TASK_ID", "1") + ) + if slurm_num_tasks > 1: + engine_logpath += f".{slurm_task_id}" + + try: + if os.path.isfile(engine_logpath): os.link(engine_logpath, logpath) - has_linked_logfile = True - break - except FileExistsError: - pass - - if not has_linked_logfile: - engine_logpath = engine_logpath_candidates[0] - logging.warning("Could not find engine logfile: %s Create soft link anyway." % engine_logpath) - try: + else: + logging.warning("Could not find engine logfile: %s Create soft link anyway." % engine_logpath) os.symlink(os.path.relpath(engine_logpath, os.path.dirname(logpath)), logpath) - except FileExistsError: - pass + except FileExistsError: + pass def get_logpath(self, logpath_base, task_name, task_id): """Returns log file for the currently running task""" From 39fd9e6e45dec761ca5bdec3bd3eb1e1cb22dd09 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Mon, 4 Nov 2024 08:23:16 -0500 Subject: [PATCH 8/9] use f-string --- sisyphus/simple_linux_utility_for_resource_management_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sisyphus/simple_linux_utility_for_resource_management_engine.py b/sisyphus/simple_linux_utility_for_resource_management_engine.py index d5a5e29..0c61290 100644 --- a/sisyphus/simple_linux_utility_for_resource_management_engine.py +++ b/sisyphus/simple_linux_utility_for_resource_management_engine.py @@ -233,7 +233,7 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, start_id, end_id, :param int step_size: """ name = self.process_task_name(name) - out_log_file = logpath + "/%x.%A.%a" + out_log_file = f"{logpath}/%x.%A.%a" if rqmt.get("multi_node_slots", 1) > 1: out_log_file += ".%t" sbatch_call = ["sbatch", "-J", name, "--mail-type=None"] From cbc0ed485e32809baf4167d74b6f6c9cb740c680 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Mon, 4 Nov 2024 09:50:56 -0500 Subject: [PATCH 9/9] clean up log suffix creation --- sisyphus/simple_linux_utility_for_resource_management_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sisyphus/simple_linux_utility_for_resource_management_engine.py b/sisyphus/simple_linux_utility_for_resource_management_engine.py index 0c61290..7815666 100644 --- a/sisyphus/simple_linux_utility_for_resource_management_engine.py +++ b/sisyphus/simple_linux_utility_for_resource_management_engine.py @@ -410,7 +410,7 @@ def init_worker(self, task): array_task_id = self.get_task_id(None) # keep backwards compatibility: only change output file name for multi-SLURM-task jobs - log_suffix = array_task_id if slurm_num_tasks <= 1 else f"{array_task_id}.{slurm_task_id}" + log_suffix = str(array_task_id) + (f".{slurm_task_id}" if slurm_num_tasks > 1 else "") logpath = os.path.relpath(task.path(gs.JOB_LOG, log_suffix)) if os.path.isfile(logpath): os.unlink(logpath)