Skip to content

Commit

Permalink
Merge pull request #6533 from hjoliver/fix-job-runner-mgr
Browse files Browse the repository at this point in the history
Fix unlinking old job out/err files.
  • Loading branch information
wxtim authored Jan 8, 2025
2 parents ab0042a + 2dcea16 commit 874bf8d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 6 deletions.
16 changes: 10 additions & 6 deletions cylc/flow/job_runner_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,14 +373,15 @@ def job_kill(self, st_file_path):

@classmethod
def _create_nn(cls, job_file_path):
"""Create NN symbolic link, if necessary.
"""Create NN symbolic link if necessary, and remove any old job logs.
If NN => 01, remove numbered dirs with submit numbers greater than 01.
If NN => 01, remove numbered directories with submit numbers greater
than 01.
Helper for "self._job_submit_impl".
"""
job_file_dir = os.path.dirname(job_file_path)

source = os.path.basename(job_file_dir)
task_log_dir = os.path.dirname(job_file_dir)
nn_path = os.path.join(task_log_dir, "NN")
Expand All @@ -393,6 +394,7 @@ def _create_nn(cls, job_file_path):
old_source = None
if old_source is None:
os.symlink(source, nn_path)

# On submit 1, remove any left over digit directories from prev runs
if source == "01":
for name in os.listdir(task_log_dir):
Expand All @@ -401,6 +403,11 @@ def _create_nn(cls, job_file_path):
rmtree(
os.path.join(task_log_dir, name), ignore_errors=True)

# Delete old job logs if necessary
for name in JOB_LOG_ERR, JOB_LOG_OUT:
with suppress(FileNotFoundError):
os.unlink(os.path.join(job_file_dir, name))

@classmethod
def _filter_submit_output(cls, st_file_path, job_runner, out, err):
"""Filter submit command output, if relevant."""
Expand Down Expand Up @@ -567,9 +574,6 @@ def _job_submit_impl(

# Create NN symbolic link, if necessary
self._create_nn(job_file_path)
for name in JOB_LOG_ERR, JOB_LOG_OUT:
with suppress(OSError):
os.unlink(os.path.join(job_file_path, name))

# Start new status file
with open(f"{job_file_path}.status", "w") as job_status_file:
Expand Down
51 changes: 51 additions & 0 deletions tests/integration/test_job_runner_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from cylc.flow.pathutil import get_workflow_run_job_dir
from cylc.flow.task_state import TASK_STATUS_RUNNING
from cylc.flow.subprocctx import SubProcContext
from cylc.flow.task_job_logs import JOB_LOG_OUT, JOB_LOG_ERR


async def test_kill_error(one, start, test_dir, capsys, log_filter):
Expand Down Expand Up @@ -82,3 +83,53 @@ async def test_kill_error(one, start, test_dir, capsys, log_filter):
level=logging.WARNING,
)
assert itask.state(TASK_STATUS_RUNNING)


async def test_create_nn_new(one, start):
"""Test _create_nn.
It should create the NN symlink.
"""
async with start(one):
itask = one.pool.get_tasks()[0]

workflow_job_log_dir = Path(get_workflow_run_job_dir(one.workflow))
job_id = itask.tokens.duplicate(job='01').relative_id
job_log_dir = Path(workflow_job_log_dir, job_id)
job_log_dir.mkdir(parents=True)

# call _create_nn
JobRunnerManager()._create_nn(job_log_dir / 'job')

# check the symlink exists
assert (job_log_dir.parent / "NN").is_symlink()


async def test_create_nn_old(one, start):
"""Test _create_nn.
It should remove existing job logs, if the dir already exists.
"""
async with start(one):
itask = one.pool.get_tasks()[0]

# fake some old job logs
workflow_job_log_dir = Path(get_workflow_run_job_dir(one.workflow))
job_id = itask.tokens.duplicate(job='01').relative_id
job_log_dir = Path(workflow_job_log_dir, job_id)
job_log_dir.mkdir(parents=True)

job_logs = []
for name in JOB_LOG_OUT, JOB_LOG_ERR:
job_logs.append(job_log_dir / name)

# create the logs
for job_log in job_logs:
job_log.touch()

# call _create_nn
JobRunnerManager()._create_nn(job_log_dir / 'job')

# check they were removed
for job_log in job_logs:
assert not job_log.is_file()

0 comments on commit 874bf8d

Please sign in to comment.