diff --git a/docs/further.md b/docs/further.md index f52c7c5..c7f95ae 100644 --- a/docs/further.md +++ b/docs/further.md @@ -104,22 +104,7 @@ A workflow rule may support several [resource specifications](https://snakemake.readthedocs.io/en/latest/snakefiles/rules.html#resources). For a SLURM cluster, a mapping between Snakemake and SLURM needs to be performed. -You can use the following specifications: - -| SLURM | Snakemake | Description | -|----------------|------------|---------------------------------------| -| `--partition` | `slurm_partition` | the partition a rule/job is to use | -| `--time` | `runtime` | the walltime per job in minutes | -| `--constraint` | `constraint` | may hold features on some clusters | -| `--mem` | `mem`, `mem_mb` | memory a cluster node must | -| | | provide (`mem`: string with unit), `mem_mb`: i | -| `--mem-per-cpu` | `mem_mb_per_cpu` | memory per reserved CPU | -| `--ntasks` | `tasks` | number of concurrent tasks / ranks | -| `--cpus-per-task` | `cpus_per_task` | number of cpus per task (in case of SMP, rather use `threads`) | -| `--nodes` | `nodes` | number of nodes | -| `--clusters` | `clusters` | comma separated string of clusters | - -Each of these can be part of a rule, e.g.: +Each of the listed command line flags can be part of a rule, e.g.: ``` python rule: @@ -158,16 +143,6 @@ set-resources: cpus_per_task: 40 ``` -#### Additional Command Line Flags - -This plugin defines additional command line flags. -As always, these can be set on the command line or in a profile. - -| Flag | Meaning | -|-------------|----------| -| `--slurm_init_seconds_before_status_checks`| modify time before initial job status check; the default of 40 seconds avoids load on querying slurm databases, but shorter wait times are for example useful during workflow development | -| `--slurm_requeue` | allows jobs to be resubmitted automatically if they fail or are preempted. See the [section "retries" for details](#retries)| - #### Multicluster Support For reasons of scheduling multicluster support is provided by the `clusters` flag in resources sections. Note, that you have to write `clusters`, not `cluster`! @@ -279,6 +254,18 @@ export SNAKEMAKE_PROFILE="$HOME/.config/snakemake" ==This is ongoing development. Eventually you will be able to annotate different file access patterns.== +### Log Files - Getting Information on Failures + +Snakemake, via this SLURM executor, submits itself as a job. This ensures that all features are preserved in the job context. SLURM requires a logfile to be written for _every_ job. This is redundant information and only contains the Snakemake output already printed on the terminal. If a rule is equipped with a `log` directive, SLURM logs only contain Snakemake's output. + +This executor will remove SLURM logs of sucessful jobs immediately when they are finished. You can change this behaviour with the flag `--slurm-keep-successful-logs`. A log file for a failed job will be preserved per default for 10 days. You may change this value using the `--slurm-delete-logfiles-older-than` flag. + +The default location of Snakemake log files are relative to the directory where the workflow is started or relative to the directory indicated with `--directory`. SLURM logs, produced by Snakemake, can be redirected using `--slurm-logdir`. If you want avoid that log files accumulate in different directories, you can store them in your home directory. Best put the parameter in your profile then, e.g.: + +```YAML +slurm-logdir: "/home//.snakemake/slurm_logs" +``` + ### Retries - Or Trying again when a Job failed Some cluster jobs may fail. In this case Snakemake can be instructed to try another submit before the entire workflow fails, in this example up to 3 times: diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index e70b5a1..6fdfbb1 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -3,9 +3,11 @@ __email__ = "johannes.koester@uni-due.de" __license__ = "MIT" +import atexit import csv from io import StringIO import os +from pathlib import Path import re import shlex import subprocess @@ -26,18 +28,48 @@ from snakemake_interface_common.exceptions import WorkflowError from snakemake_executor_plugin_slurm_jobstep import get_cpus_per_task -from .utils import delete_slurm_environment +from .utils import delete_slurm_environment, delete_empty_dirs @dataclass class ExecutorSettings(ExecutorSettingsBase): + logdir: Optional[Path] = field( + default=None, + metadata={ + "help": "Per default the SLURM log directory is relative to " + "the working directory." + "This flag allows to set an alternative directory.", + "env_var": False, + "required": False, + }, + ) + keep_successful_logs: bool = field( + default=False, + metadata={ + "help": "Per default SLURM log files will be deleted upon sucessful " + "completion of a job. Whenever a SLURM job fails, its log " + "file will be preserved. " + "This flag allows to keep all SLURM log files, even those " + "of successful jobs.", + "env_var": False, + "required": False, + }, + ) + delete_logfiles_older_than: Optional[int] = field( + default=10, + metadata={ + "help": "Per default SLURM log files in the SLURM log directory " + "of a workflow will be deleted after 10 days. For this, " + "best leave the default log directory unaltered. " + "Setting this flag allows to change this behaviour. " + "If set to <=0, no old files will be deleted. ", + }, + ) init_seconds_before_status_checks: Optional[int] = field( default=40, metadata={ - "help": """ - Defines the time in seconds before the first status - check is performed after job submission. - """, + "help": "Defines the time in seconds before the first status " + "check is performed after job submission.", "env_var": False, "required": False, }, @@ -45,11 +77,10 @@ class ExecutorSettings(ExecutorSettingsBase): requeue: bool = field( default=False, metadata={ - "help": """ - Allow requeuing preempted of failed jobs, - if no cluster default. Results in `sbatch ... --requeue ...` - This flag has no effect, if not set. - """, + "help": "Allow requeuing preempted of failed jobs, " + "if no cluster default. Results in " + "`sbatch ... --requeue ...` " + "This flag has no effect, if not set.", "env_var": False, "required": False, }, @@ -91,6 +122,32 @@ def __post_init__(self): self._fallback_account_arg = None self._fallback_partition = None self._preemption_warning = False # no preemption warning has been issued + self.slurm_logdir = None + atexit.register(self.clean_old_logs) + + def clean_old_logs(self) -> None: + """Delete files older than specified age from the SLURM log directory.""" + # shorthands: + age_cutoff = self.workflow.executor_settings.delete_logfiles_older_than + keep_all = self.workflow.executor_settings.keep_successful_logs + if age_cutoff <= 0 or keep_all: + return + cutoff_secs = age_cutoff * 86400 + current_time = time.time() + self.logger.info(f"Cleaning up log files older than {age_cutoff} day(s)") + for path in self.slurm_logdir.rglob("*.log"): + if path.is_file(): + try: + file_age = current_time - path.stat().st_mtime + if file_age > cutoff_secs: + path.unlink() + except (OSError, FileNotFoundError) as e: + self.logger.warning(f"Could not delete logfile {path}: {e}") + # we need a 2nd iteration to remove putatively empty directories + try: + delete_empty_dirs(self.slurm_logdir) + except (OSError, FileNotFoundError) as e: + self.logger.warning(f"Could not delete empty directory {path}: {e}") def warn_on_jobcontext(self, done=None): if not done: @@ -123,18 +180,22 @@ def run_job(self, job: JobExecutorInterface): except AttributeError: wildcard_str = "" - slurm_logfile = os.path.abspath( - f".snakemake/slurm_logs/{group_or_rule}/{wildcard_str}/%j.log" + self.slurm_logdir = ( + Path(self.workflow.executor_settings.logdir) + if self.workflow.executor_settings.logdir + else Path(".snakemake/slurm_logs").resolve() ) - logdir = os.path.dirname(slurm_logfile) + + self.slurm_logdir.mkdir(parents=True, exist_ok=True) + slurm_logfile = self.slurm_logdir / group_or_rule / wildcard_str / "%j.log" + slurm_logfile.parent.mkdir(parents=True, exist_ok=True) # this behavior has been fixed in slurm 23.02, but there might be plenty of # older versions around, hence we should rather be conservative here. - assert "%j" not in logdir, ( + assert "%j" not in str(self.slurm_logdir), ( "bug: jobid placeholder in parent dir of logfile. This does not work as " "we have to create that dir before submission in order to make sbatch " "happy. Otherwise we get silent fails without logfiles being created." ) - os.makedirs(logdir, exist_ok=True) # generic part of a submission string: # we use a run_uuid as the job-name, to allow `--name`-based @@ -247,7 +308,9 @@ def run_job(self, job: JobExecutorInterface): slurm_jobid = out.strip().split(";")[0] if not slurm_jobid: raise WorkflowError("Failed to retrieve SLURM job ID from sbatch output.") - slurm_logfile = slurm_logfile.replace("%j", slurm_jobid) + slurm_logfile = slurm_logfile.with_name( + slurm_logfile.name.replace("%j", slurm_jobid) + ) self.logger.info( f"Job {job.jobid} has been submitted with SLURM jobid {slurm_jobid} " f"(log: {slurm_logfile})." @@ -380,6 +443,19 @@ async def check_active_jobs( self.report_job_success(j) any_finished = True active_jobs_seen_by_sacct.remove(j.external_jobid) + if not self.workflow.executor_settings.keep_successful_logs: + self.logger.debug( + "removing log for successful job " + f"with SLURM ID '{j.external_jobid}'" + ) + try: + if j.aux["slurm_logfile"].exists(): + j.aux["slurm_logfile"].unlink() + except (OSError, FileNotFoundError) as e: + self.logger.warning( + "Could not remove log file" + f" {j.aux['slurm_logfile']}: {e}" + ) elif status == "PREEMPTED" and not self._preemption_warning: self._preemption_warning = True self.logger.warning( @@ -404,7 +480,9 @@ async def check_active_jobs( # with a new sentence f"'{status}'. " ) - self.report_job_error(j, msg=msg, aux_logs=[j.aux["slurm_logfile"]]) + self.report_job_error( + j, msg=msg, aux_logs=[j.aux["slurm_logfile"]._str] + ) active_jobs_seen_by_sacct.remove(j.external_jobid) else: # still running? yield j diff --git a/snakemake_executor_plugin_slurm/utils.py b/snakemake_executor_plugin_slurm/utils.py index 50eb1f4..3f65dc3 100644 --- a/snakemake_executor_plugin_slurm/utils.py +++ b/snakemake_executor_plugin_slurm/utils.py @@ -1,6 +1,7 @@ # utility functions for the SLURM executor plugin import os +from pathlib import Path def delete_slurm_environment(): @@ -14,3 +15,28 @@ def delete_slurm_environment(): for var in os.environ: if var.startswith("SLURM_"): del os.environ[var] + + +def delete_empty_dirs(path: Path) -> None: + """ + Function to delete all empty directories in a given path. + This is needed to clean up the working directory after + a job has sucessfully finished. This function is needed because + the shutil.rmtree() function does not delete empty + directories. + """ + if not path.is_dir(): + return + + # Process subdirectories first (bottom-up) + for child in path.iterdir(): + if child.is_dir(): + delete_empty_dirs(child) + + try: + # Check if directory is now empty after processing children + if not any(path.iterdir()): + path.rmdir() + except (OSError, FileNotFoundError) as e: + # Provide more context in the error message + raise OSError(f"Failed to remove empty directory {path}: {e}") from e