Skip to content

Commit

Permalink
feat: custom log file behaviour (#159)
Browse files Browse the repository at this point in the history
This PR should allow to

- set a custom log directory for SLURM jobs
- enable auto-deletion of older SLURM log files after 10 days
- allow changing the that behaviour or disabling auto deletion all
together
- auto delete log files of successful SLURM jobs (and ignore this
default behaviour)

The idea behind this PR is to
- limit the number of SLURM log files to keep, as a workflow can easily
produce thousands of log files. In case of a successful job, the
information is redundant anyway, hence the proposed auto-deletion of
successful jobs.
- users may select a custom directory for SLURM log files - this way
different workflows can point to one prefix. Together with the
auto-deletion of older log files, this futher limits the number of
present log files.

It addresses issues #94 and #123.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
  - Enhanced SLURM job log management with a configurable log directory.
- Added options to control log file retention and automatic cleanup of
old logs.
- Introduced new command-line flags for specifying log management
settings.

- **Documentation**
- Updated documentation with new command-line flags for log management.
  - Improved guidance on managing job logs and their retention policies.

- **Improvements**
  - More flexible control over SLURM job log handling.
- Better support for cleaning up empty directories after job completion.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: Johannes Köster <[email protected]>
  • Loading branch information
3 people authored Jan 8, 2025
1 parent 531ebc6 commit cc3d21b
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 43 deletions.
39 changes: 13 additions & 26 deletions docs/further.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,7 @@ A workflow rule may support several
[resource specifications](https://snakemake.readthedocs.io/en/latest/snakefiles/rules.html#resources).
For a SLURM cluster, a mapping between Snakemake and SLURM needs to be performed.

You can use the following specifications:

| SLURM | Snakemake | Description |
|----------------|------------|---------------------------------------|
| `--partition` | `slurm_partition` | the partition a rule/job is to use |
| `--time` | `runtime` | the walltime per job in minutes |
| `--constraint` | `constraint` | may hold features on some clusters |
| `--mem` | `mem`, `mem_mb` | memory a cluster node must |
| | | provide (`mem`: string with unit), `mem_mb`: i |
| `--mem-per-cpu` | `mem_mb_per_cpu` | memory per reserved CPU |
| `--ntasks` | `tasks` | number of concurrent tasks / ranks |
| `--cpus-per-task` | `cpus_per_task` | number of cpus per task (in case of SMP, rather use `threads`) |
| `--nodes` | `nodes` | number of nodes |
| `--clusters` | `clusters` | comma separated string of clusters |

Each of these can be part of a rule, e.g.:
Each of the listed command line flags can be part of a rule, e.g.:

``` python
rule:
Expand Down Expand Up @@ -158,16 +143,6 @@ set-resources:
cpus_per_task: 40
```
#### Additional Command Line Flags
This plugin defines additional command line flags.
As always, these can be set on the command line or in a profile.
| Flag | Meaning |
|-------------|----------|
| `--slurm_init_seconds_before_status_checks`| modify time before initial job status check; the default of 40 seconds avoids load on querying slurm databases, but shorter wait times are for example useful during workflow development |
| `--slurm_requeue` | allows jobs to be resubmitted automatically if they fail or are preempted. See the [section "retries" for details](#retries)|

#### Multicluster Support
For reasons of scheduling multicluster support is provided by the `clusters` flag in resources sections. Note, that you have to write `clusters`, not `cluster`!
Expand Down Expand Up @@ -279,6 +254,18 @@ export SNAKEMAKE_PROFILE="$HOME/.config/snakemake"

==This is ongoing development. Eventually you will be able to annotate different file access patterns.==

### Log Files - Getting Information on Failures

Snakemake, via this SLURM executor, submits itself as a job. This ensures that all features are preserved in the job context. SLURM requires a logfile to be written for _every_ job. This is redundant information and only contains the Snakemake output already printed on the terminal. If a rule is equipped with a `log` directive, SLURM logs only contain Snakemake's output.

This executor will remove SLURM logs of sucessful jobs immediately when they are finished. You can change this behaviour with the flag `--slurm-keep-successful-logs`. A log file for a failed job will be preserved per default for 10 days. You may change this value using the `--slurm-delete-logfiles-older-than` flag.

The default location of Snakemake log files are relative to the directory where the workflow is started or relative to the directory indicated with `--directory`. SLURM logs, produced by Snakemake, can be redirected using `--slurm-logdir`. If you want avoid that log files accumulate in different directories, you can store them in your home directory. Best put the parameter in your profile then, e.g.:

```YAML
slurm-logdir: "/home/<username>/.snakemake/slurm_logs"
```

### Retries - Or Trying again when a Job failed

Some cluster jobs may fail. In this case Snakemake can be instructed to try another submit before the entire workflow fails, in this example up to 3 times:
Expand Down
112 changes: 95 additions & 17 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
__email__ = "[email protected]"
__license__ = "MIT"

import atexit
import csv
from io import StringIO
import os
from pathlib import Path
import re
import shlex
import subprocess
Expand All @@ -26,30 +28,59 @@
from snakemake_interface_common.exceptions import WorkflowError
from snakemake_executor_plugin_slurm_jobstep import get_cpus_per_task

from .utils import delete_slurm_environment
from .utils import delete_slurm_environment, delete_empty_dirs


@dataclass
class ExecutorSettings(ExecutorSettingsBase):
logdir: Optional[Path] = field(
default=None,
metadata={
"help": "Per default the SLURM log directory is relative to "
"the working directory."
"This flag allows to set an alternative directory.",
"env_var": False,
"required": False,
},
)
keep_successful_logs: bool = field(
default=False,
metadata={
"help": "Per default SLURM log files will be deleted upon sucessful "
"completion of a job. Whenever a SLURM job fails, its log "
"file will be preserved. "
"This flag allows to keep all SLURM log files, even those "
"of successful jobs.",
"env_var": False,
"required": False,
},
)
delete_logfiles_older_than: Optional[int] = field(
default=10,
metadata={
"help": "Per default SLURM log files in the SLURM log directory "
"of a workflow will be deleted after 10 days. For this, "
"best leave the default log directory unaltered. "
"Setting this flag allows to change this behaviour. "
"If set to <=0, no old files will be deleted. ",
},
)
init_seconds_before_status_checks: Optional[int] = field(
default=40,
metadata={
"help": """
Defines the time in seconds before the first status
check is performed after job submission.
""",
"help": "Defines the time in seconds before the first status "
"check is performed after job submission.",
"env_var": False,
"required": False,
},
)
requeue: bool = field(
default=False,
metadata={
"help": """
Allow requeuing preempted of failed jobs,
if no cluster default. Results in `sbatch ... --requeue ...`
This flag has no effect, if not set.
""",
"help": "Allow requeuing preempted of failed jobs, "
"if no cluster default. Results in "
"`sbatch ... --requeue ...` "
"This flag has no effect, if not set.",
"env_var": False,
"required": False,
},
Expand Down Expand Up @@ -91,6 +122,32 @@ def __post_init__(self):
self._fallback_account_arg = None
self._fallback_partition = None
self._preemption_warning = False # no preemption warning has been issued
self.slurm_logdir = None
atexit.register(self.clean_old_logs)

def clean_old_logs(self) -> None:
"""Delete files older than specified age from the SLURM log directory."""
# shorthands:
age_cutoff = self.workflow.executor_settings.delete_logfiles_older_than
keep_all = self.workflow.executor_settings.keep_successful_logs
if age_cutoff <= 0 or keep_all:
return
cutoff_secs = age_cutoff * 86400
current_time = time.time()
self.logger.info(f"Cleaning up log files older than {age_cutoff} day(s)")
for path in self.slurm_logdir.rglob("*.log"):
if path.is_file():
try:
file_age = current_time - path.stat().st_mtime
if file_age > cutoff_secs:
path.unlink()
except (OSError, FileNotFoundError) as e:
self.logger.warning(f"Could not delete logfile {path}: {e}")
# we need a 2nd iteration to remove putatively empty directories
try:
delete_empty_dirs(self.slurm_logdir)
except (OSError, FileNotFoundError) as e:
self.logger.warning(f"Could not delete empty directory {path}: {e}")

def warn_on_jobcontext(self, done=None):
if not done:
Expand Down Expand Up @@ -123,18 +180,22 @@ def run_job(self, job: JobExecutorInterface):
except AttributeError:
wildcard_str = ""

slurm_logfile = os.path.abspath(
f".snakemake/slurm_logs/{group_or_rule}/{wildcard_str}/%j.log"
self.slurm_logdir = (
Path(self.workflow.executor_settings.logdir)
if self.workflow.executor_settings.logdir
else Path(".snakemake/slurm_logs").resolve()
)
logdir = os.path.dirname(slurm_logfile)

self.slurm_logdir.mkdir(parents=True, exist_ok=True)
slurm_logfile = self.slurm_logdir / group_or_rule / wildcard_str / "%j.log"
slurm_logfile.parent.mkdir(parents=True, exist_ok=True)
# this behavior has been fixed in slurm 23.02, but there might be plenty of
# older versions around, hence we should rather be conservative here.
assert "%j" not in logdir, (
assert "%j" not in str(self.slurm_logdir), (
"bug: jobid placeholder in parent dir of logfile. This does not work as "
"we have to create that dir before submission in order to make sbatch "
"happy. Otherwise we get silent fails without logfiles being created."
)
os.makedirs(logdir, exist_ok=True)

# generic part of a submission string:
# we use a run_uuid as the job-name, to allow `--name`-based
Expand Down Expand Up @@ -247,7 +308,9 @@ def run_job(self, job: JobExecutorInterface):
slurm_jobid = out.strip().split(";")[0]
if not slurm_jobid:
raise WorkflowError("Failed to retrieve SLURM job ID from sbatch output.")
slurm_logfile = slurm_logfile.replace("%j", slurm_jobid)
slurm_logfile = slurm_logfile.with_name(
slurm_logfile.name.replace("%j", slurm_jobid)
)
self.logger.info(
f"Job {job.jobid} has been submitted with SLURM jobid {slurm_jobid} "
f"(log: {slurm_logfile})."
Expand Down Expand Up @@ -380,6 +443,19 @@ async def check_active_jobs(
self.report_job_success(j)
any_finished = True
active_jobs_seen_by_sacct.remove(j.external_jobid)
if not self.workflow.executor_settings.keep_successful_logs:
self.logger.debug(
"removing log for successful job "
f"with SLURM ID '{j.external_jobid}'"
)
try:
if j.aux["slurm_logfile"].exists():
j.aux["slurm_logfile"].unlink()
except (OSError, FileNotFoundError) as e:
self.logger.warning(
"Could not remove log file"
f" {j.aux['slurm_logfile']}: {e}"
)
elif status == "PREEMPTED" and not self._preemption_warning:
self._preemption_warning = True
self.logger.warning(
Expand All @@ -404,7 +480,9 @@ async def check_active_jobs(
# with a new sentence
f"'{status}'. "
)
self.report_job_error(j, msg=msg, aux_logs=[j.aux["slurm_logfile"]])
self.report_job_error(
j, msg=msg, aux_logs=[j.aux["slurm_logfile"]._str]
)
active_jobs_seen_by_sacct.remove(j.external_jobid)
else: # still running?
yield j
Expand Down
26 changes: 26 additions & 0 deletions snakemake_executor_plugin_slurm/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# utility functions for the SLURM executor plugin

import os
from pathlib import Path


def delete_slurm_environment():
Expand All @@ -14,3 +15,28 @@ def delete_slurm_environment():
for var in os.environ:
if var.startswith("SLURM_"):
del os.environ[var]


def delete_empty_dirs(path: Path) -> None:
"""
Function to delete all empty directories in a given path.
This is needed to clean up the working directory after
a job has sucessfully finished. This function is needed because
the shutil.rmtree() function does not delete empty
directories.
"""
if not path.is_dir():
return

# Process subdirectories first (bottom-up)
for child in path.iterdir():
if child.is_dir():
delete_empty_dirs(child)

try:
# Check if directory is now empty after processing children
if not any(path.iterdir()):
path.rmdir()
except (OSError, FileNotFoundError) as e:
# Provide more context in the error message
raise OSError(f"Failed to remove empty directory {path}: {e}") from e

0 comments on commit cc3d21b

Please sign in to comment.