Skip to content

Commit

Permalink
fix: using atexit to decouple the function from __del__, moved all co…
Browse files Browse the repository at this point in the history
…de within __postinit__
  • Loading branch information
cmeesters committed Nov 19, 2024
1 parent 34adb4e commit 730433f
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 59 deletions.
132 changes: 91 additions & 41 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
__email__ = "[email protected]"
__license__ = "MIT"

import atexit
import csv
from io import StringIO
import os
Expand All @@ -26,18 +27,53 @@
from snakemake_interface_common.exceptions import WorkflowError
from snakemake_executor_plugin_slurm_jobstep import get_cpus_per_task

from .utils import delete_slurm_environment
from .utils import delete_slurm_environment, clean_old_logs


@dataclass
class ExecutorSettings(ExecutorSettingsBase):
logdir: Optional[str] = field(
default=f"/home/{os.environ['USER']}/.snakemake/slurm_logs",
metadata={
"help": """
Per default the SLURM log directory (writing output is required by SLURM)
is '~/.snakemake/slurm_logs'. This flag allows to set an alternative
directory.
""",
"env_var": False,
"required": False,
},
)
keep_successful_logs: bool = field(
default=False,
metadata={
"help": """
Per default SLURM log files will be deleted upon sucessful completion
of a job. Whenever a SLURM job fails, its log file will be preserved.
This flag allows to keep all SLURM log files, even those of successful
jobs.
""",
"env_var": False,
"required": False,
},
)
delete_logfiles_older_than: Optional[int] = field(
default=10,
metadata={
"help": """
Per default SLURM log files in the SLURM log directory of a workflow
will be deleted after 10 days. Setting this flag allows to change this behaviour.
If set to '0', no old files will be deleted.
"""
},
)
init_seconds_before_status_checks: Optional[int] = field(
default=40,
metadata={
"help": """
Defines the time in seconds before the first
status check is performed after job submission.
""",
Defines the time in seconds before the first status
check is performed after job submission.
""",
"env_var": False,
"required": False,
},
Expand All @@ -47,8 +83,7 @@ class ExecutorSettings(ExecutorSettingsBase):
metadata={
"help": """
Allow requeuing preempted of failed jobs,
if no cluster default. Results in
`sbatch ... --requeue ...`.
if no cluster default. Results in `sbatch ... --requeue ...`
This flag has no effect, if not set.
""",
"env_var": False,
Expand All @@ -63,8 +98,7 @@ class ExecutorSettings(ExecutorSettingsBase):
# define whether your executor plugin executes locally
# or remotely. In virtually all cases, it will be remote execution
# (cluster, cloud, etc.). Only Snakemake's standard execution
# plugins (snakemake-executor-plugin-dryrun,
# snakemake-executor-plugin-local)
# plugins (snakemake-executor-plugin-dryrun, snakemake-executor-plugin-local)
# are expected to specify False here.
non_local_exec=True,
# Define whether your executor plugin implies that there is no shared
Expand Down Expand Up @@ -94,6 +128,36 @@ def __post_init__(self):
self._fallback_partition = None
self._preemption_warning = False # no preemption warning has been issued

def clean_old_logs(logdir, age_cutoff):
"""
Function to delete files older than 'age_cutoff'
in the SLURM 'logdir'
"""
if age_cutoff <= 0:
return
cutoff_secs = age_cutoff * 86400
current_time = time.time()
self.logger.info(f"Cleaning up log files older than {age_cutoff} day(s)")

for root, _, files in os.walk(logdir, topdown=False):
for fname in files:
file_path = os.path.join(root, fname)
try:
file_age = current_time - os.stat(file_path).st_mtime
if file_age > cutoff_secs:
os.remove(file_path)
except (OSError, FileNotFoundError) as e:
self.logger.warning(f"Could not delete file {file_path}: {e}")
# remove empty rule top dir, if empty
if len(os.listdir(root)) == 0:
os.rmdir(root)

atexit.register(
clean_old_logs,
self.workflow.executor_settings.logdir,
self.workflow.executor_settings.delete_logfiles_older_than,
)

def warn_on_jobcontext(self, done=None):
if not done:
if "SLURM_JOB_ID" in os.environ:
Expand All @@ -106,33 +170,12 @@ def warn_on_jobcontext(self, done=None):
delete_slurm_environment()
done = True

# def delete_old_logs(self):
# self.workflow.executor_settings.delete_logfiles_older_than

def additional_general_args(self):
return "--executor slurm-jobstep --jobs 1"

def run_jobs(
self,
jobs: List[JobExecutorInterface],
):
"""Run a list of jobs that is ready at a given point in time.
By default, this method just runs each job individually.
This method is overwritten in the executor plugin to implement
- SLURM array jobs.
- pooled jobs (currently in planning)
"""
self.logger.debug(f"jobs: {jobs}")
# self.logger.debug(f"job attributes: {dir(jobs[0])}")
self.logger.debug(f"jobs length: {len(jobs)}")
# sys.exit(1)
# g = groupby(jobs, key=Job.rule)
# self.logger.debug(f"Jobs grouped by rule: {g}")
# sys.exit(1)
for job in jobs:
if job.name == "foo":
break
self.run_job_pre(job)
self.run_job(job)

def run_job(self, job: JobExecutorInterface):
# Implement here how to run a job.
# You can access the job's resources, etc.
Expand All @@ -149,26 +192,29 @@ def run_job(self, job: JobExecutorInterface):
except AttributeError:
wildcard_str = ""

slurm_logfile = os.path.abspath(
f".snakemake/slurm_logs/{group_or_rule}/{wildcard_str}/%j.log"
slurm_logfile = (
self.workflow.executor_settings.logdir
+ os.path.sep
+ f"{group_or_rule}/{wildcard_str}/%j.log"
)
logdir = os.path.dirname(slurm_logfile)

slurm_logdir = os.path.dirname(slurm_logfile)
# this behavior has been fixed in slurm 23.02, but there might be plenty of
# older versions around, hence we should rather be conservative here.
assert "%j" not in logdir, (
assert "%j" not in slurm_logdir, (
"bug: jobid placeholder in parent dir of logfile. This does not work as "
"we have to create that dir before submission in order to make sbatch "
"happy. Otherwise we get silent fails without logfiles being created."
)
os.makedirs(logdir, exist_ok=True)
os.makedirs(slurm_logdir, exist_ok=True)

# generic part of a submission string:
# we use a run_uuid as the job-name, to allow `--name`-based
# filtering in the job status checks (`sacct --name` and `squeue --name`)
if wildcard_str == "":
comment_str = f"{group_or_rule}"
comment_str = f"rule_{job.name}"
else:
comment_str = f"{group_or_rule}_wildcards_{wildcard_str}"
comment_str = f"rule_{job.name}_wildcards_{wildcard_str}"
call = (
f"sbatch "
f"--parsable "
Expand Down Expand Up @@ -406,6 +452,11 @@ async def check_active_jobs(
self.report_job_success(j)
any_finished = True
active_jobs_seen_by_sacct.remove(j.external_jobid)
if not self.workflow.executor_settings.keep_successful_logs:
self.logger.debug(
f"removing log for successful job with SLURM ID '{j.external_jobid}'"
)
os.remove(j.aux["slurm_logfile"])
elif status == "PREEMPTED" and not self._preemption_warning:
self._preemption_warning = True
self.logger.warning(
Expand Down Expand Up @@ -501,8 +552,7 @@ async def job_stati(self, command):
}
except subprocess.CalledProcessError as e:
self.logger.error(
"The job status query failed with"
f" command: {command}\n"
f"The job status query failed with command: {command}\n"
f"Error message: {e.stderr.strip()}\n"
)
pass
Expand Down
22 changes: 4 additions & 18 deletions snakemake_executor_plugin_slurm/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# utility functions for the SLURM executor plugin

import os
import time
import logging

logger = logging.getLogger(__name__)


def delete_slurm_environment():
Expand All @@ -14,21 +18,3 @@ def delete_slurm_environment():
for var in os.environ:
if var.startswith("SLURM_"):
del os.environ[var]


def clean_old_logs(logdir, age_cutoff):
"""
Function to delete files older than 'age_cutoff'
in the SLURM 'logdir'
"""
cutoff_secs = age_cutoff * 86400
for root, _, files in os.walk(logdir, topdown=False):
for fname in files:
file_path = os.path.join(root, fname)
if age_cutoff > 0:
filestamp = os.stat(file_path).st_mtime
if filestamp > cutoff_secs:
os.remove(file_path)
# remove empty rule top dir, if empty
if len(os.listdir(root)) == 0:
os.rmdir(root)

0 comments on commit 730433f

Please sign in to comment.