diff --git a/cylc/flow/job_runner_mgr.py b/cylc/flow/job_runner_mgr.py index b8ddaaa053..558a3d158b 100644 --- a/cylc/flow/job_runner_mgr.py +++ b/cylc/flow/job_runner_mgr.py @@ -373,14 +373,15 @@ def job_kill(self, st_file_path): @classmethod def _create_nn(cls, job_file_path): - """Create NN symbolic link, if necessary. + """Create NN symbolic link if necessary, and remove any old job logs. + + If NN => 01, remove numbered dirs with submit numbers greater than 01. - If NN => 01, remove numbered directories with submit numbers greater - than 01. Helper for "self._job_submit_impl". """ job_file_dir = os.path.dirname(job_file_path) + source = os.path.basename(job_file_dir) task_log_dir = os.path.dirname(job_file_dir) nn_path = os.path.join(task_log_dir, "NN") @@ -393,6 +394,7 @@ def _create_nn(cls, job_file_path): old_source = None if old_source is None: os.symlink(source, nn_path) + # On submit 1, remove any left over digit directories from prev runs if source == "01": for name in os.listdir(task_log_dir): @@ -401,6 +403,11 @@ def _create_nn(cls, job_file_path): rmtree( os.path.join(task_log_dir, name), ignore_errors=True) + # Delete old job logs if necessary + for name in JOB_LOG_ERR, JOB_LOG_OUT: + with suppress(FileNotFoundError): + os.unlink(os.path.join(job_file_dir, name)) + @classmethod def _filter_submit_output(cls, st_file_path, job_runner, out, err): """Filter submit command output, if relevant.""" @@ -567,9 +574,6 @@ def _job_submit_impl( # Create NN symbolic link, if necessary self._create_nn(job_file_path) - for name in JOB_LOG_ERR, JOB_LOG_OUT: - with suppress(OSError): - os.unlink(os.path.join(job_file_path, name)) # Start new status file with open(f"{job_file_path}.status", "w") as job_status_file: diff --git a/tests/integration/test_job_runner_mgr.py b/tests/integration/test_job_runner_mgr.py index 39db087f7b..35858ebbd4 100644 --- a/tests/integration/test_job_runner_mgr.py +++ b/tests/integration/test_job_runner_mgr.py @@ -24,6 +24,7 @@ from cylc.flow.pathutil import get_workflow_run_job_dir from cylc.flow.task_state import TASK_STATUS_RUNNING from cylc.flow.subprocctx import SubProcContext +from cylc.flow.task_job_logs import JOB_LOG_OUT, JOB_LOG_ERR async def test_kill_error(one, start, test_dir, capsys, log_filter): @@ -82,3 +83,53 @@ async def test_kill_error(one, start, test_dir, capsys, log_filter): level=logging.WARNING, ) assert itask.state(TASK_STATUS_RUNNING) + + +async def test_create_nn_new(one, start): + """Test _create_nn. + + It should create the NN symlink. + """ + async with start(one): + itask = one.pool.get_tasks()[0] + + workflow_job_log_dir = Path(get_workflow_run_job_dir(one.workflow)) + job_id = itask.tokens.duplicate(job='01').relative_id + job_log_dir = Path(workflow_job_log_dir, job_id) + job_log_dir.mkdir(parents=True) + + # call _create_nn + JobRunnerManager()._create_nn(job_log_dir / 'job') + + # check the symlink exists + assert (job_log_dir.parent / "NN").is_symlink() + + +async def test_create_nn_old(one, start): + """Test _create_nn. + + It should remove existing job logs, if the dir already exists. + """ + async with start(one): + itask = one.pool.get_tasks()[0] + + # fake some old job logs + workflow_job_log_dir = Path(get_workflow_run_job_dir(one.workflow)) + job_id = itask.tokens.duplicate(job='01').relative_id + job_log_dir = Path(workflow_job_log_dir, job_id) + job_log_dir.mkdir(parents=True) + + job_logs = [] + for name in JOB_LOG_OUT, JOB_LOG_ERR: + job_logs.append(job_log_dir / name) + + # create the logs + for job_log in job_logs: + job_log.touch() + + # call _create_nn + JobRunnerManager()._create_nn(job_log_dir / 'job') + + # check they were removed + for job_log in job_logs: + assert not job_log.is_file()