From e78ea6b94799580d5749ceadf9adb17bb05f8653 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Mon, 13 Jan 2025 15:17:41 +0000 Subject: [PATCH] Tidy TaskJobManager: remove unecessary `workflow` arg from many methods --- cylc/flow/commands.py | 2 +- cylc/flow/run_modes/__init__.py | 2 - cylc/flow/run_modes/dummy.py | 3 +- cylc/flow/run_modes/simulation.py | 3 +- cylc/flow/run_modes/skip.py | 3 +- cylc/flow/scheduler.py | 7 +- cylc/flow/task_job_mgr.py | 179 +++++++++--------- tests/integration/conftest.py | 13 +- .../run_modes/test_mode_overrides.py | 2 - tests/integration/run_modes/test_nonlive.py | 3 - .../integration/run_modes/test_simulation.py | 33 ++-- tests/integration/run_modes/test_skip.py | 4 - tests/integration/test_job_runner_mgr.py | 1 - tests/integration/test_platforms.py | 4 +- tests/integration/test_task_events_mgr.py | 1 - tests/integration/test_task_job_mgr.py | 21 +- 16 files changed, 126 insertions(+), 155 deletions(-) diff --git a/cylc/flow/commands.py b/cylc/flow/commands.py index 594f6587d5c..0f47029f751 100644 --- a/cylc/flow/commands.py +++ b/cylc/flow/commands.py @@ -254,7 +254,7 @@ async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]): if schd.get_run_mode() == RunMode.SIMULATION: yield 0 itasks, _, bad_items = schd.pool.filter_task_proxies(tasks) - schd.task_job_mgr.poll_task_jobs(schd.workflow, itasks) + schd.task_job_mgr.poll_task_jobs(itasks) yield len(bad_items) diff --git a/cylc/flow/run_modes/__init__.py b/cylc/flow/run_modes/__init__.py index 91d65dba4b1..2f094631380 100644 --- a/cylc/flow/run_modes/__init__.py +++ b/cylc/flow/run_modes/__init__.py @@ -31,8 +31,6 @@ 'TaskProxy', # the task's runtime config (with broadcasts applied) Dict[str, Any], - # the workflow ID - str, # the current time as (float_unix_time, str_ISO8601) Tuple[float, str] ], diff --git a/cylc/flow/run_modes/dummy.py b/cylc/flow/run_modes/dummy.py index 26d887d87dc..b5156f25e4c 100644 --- a/cylc/flow/run_modes/dummy.py +++ b/cylc/flow/run_modes/dummy.py @@ -50,7 +50,6 @@ def submit_task_job( task_job_mgr: 'TaskJobManager', itask: 'TaskProxy', rtconfig: Dict[str, Any], - workflow: str, now: Tuple[float, str] ) -> 'Literal[False]': """Submit a task in dummy mode. @@ -77,7 +76,7 @@ def submit_task_job( itask.summary[task_job_mgr.KEY_EXECUTE_TIME_LIMIT] = ( itask.mode_settings.simulated_run_length) itask.jobs.append( - task_job_mgr.get_simulation_job_conf(itask, workflow)) + task_job_mgr.get_simulation_job_conf(itask)) task_job_mgr.workflow_db_mgr.put_insert_task_jobs( itask, { 'time_submit': now[1], diff --git a/cylc/flow/run_modes/simulation.py b/cylc/flow/run_modes/simulation.py index 900a2c1fc4f..a9090bf0a84 100644 --- a/cylc/flow/run_modes/simulation.py +++ b/cylc/flow/run_modes/simulation.py @@ -51,7 +51,6 @@ def submit_task_job( task_job_mgr: 'TaskJobManager', itask: 'TaskProxy', rtconfig: Dict[str, Any], - workflow: str, now: Tuple[float, str] ) -> 'Literal[True]': """Submit a task in simulation mode. @@ -84,7 +83,7 @@ def submit_task_job( itask.mode_settings.simulated_run_length ) itask.jobs.append( - task_job_mgr.get_simulation_job_conf(itask, workflow) + task_job_mgr.get_simulation_job_conf(itask) ) task_job_mgr.task_events_mgr.process_message( itask, INFO, TASK_OUTPUT_SUBMITTED, diff --git a/cylc/flow/run_modes/skip.py b/cylc/flow/run_modes/skip.py index 49736883911..4e0789ed981 100644 --- a/cylc/flow/run_modes/skip.py +++ b/cylc/flow/run_modes/skip.py @@ -40,7 +40,6 @@ def submit_task_job( task_job_mgr: 'TaskJobManager', itask: 'TaskProxy', rtconfig: Dict, - _workflow: str, now: Tuple[float, str] ) -> 'Literal[True]': """Submit a task in skip mode. @@ -65,7 +64,7 @@ def submit_task_job( } itask.summary['job_runner_name'] = RunMode.SKIP.value itask.jobs.append( - task_job_mgr.get_simulation_job_conf(itask, _workflow) + task_job_mgr.get_simulation_job_conf(itask) ) itask.run_mode = RunMode.SKIP task_job_mgr.workflow_db_mgr.put_insert_task_jobs( diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index dec3a279ce9..c9003c6c70a 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -995,7 +995,7 @@ def process_queued_task_messages(self) -> None: if should_poll: to_poll_tasks.append(itask) if to_poll_tasks: - self.task_job_mgr.poll_task_jobs(self.workflow, to_poll_tasks) + self.task_job_mgr.poll_task_jobs(to_poll_tasks) # Remaining unprocessed messages have no corresponding task proxy. # For example, if I manually set a running task to succeeded, the @@ -1096,7 +1096,7 @@ def kill_tasks( f"{', '.join(sorted(t.identity for t in unkillable))}" ) if not jobless: - self.task_job_mgr.kill_task_jobs(self.workflow, to_kill) + self.task_job_mgr.kill_task_jobs(to_kill) return len(unkillable) @@ -1536,7 +1536,6 @@ def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool: log = LOG.info for itask in self.task_job_mgr.submit_task_jobs( - self.workflow, itasks, self.server.curve_auth, self.server.client_pub_key_dir, @@ -1592,7 +1591,7 @@ def timeout_check(self): self.check_workflow_timers() # check submission and execution timeout and polling timers if self.get_run_mode() != RunMode.SIMULATION: - self.task_job_mgr.check_task_jobs(self.workflow, self.pool) + self.task_job_mgr.check_task_jobs(self.pool) async def workflow_shutdown(self): """Determines if the workflow can be shutdown yet.""" diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 7e1cfa02cc4..c57a5aba78b 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -168,7 +168,7 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr, self.task_remote_mgr = TaskRemoteMgr( workflow, proc_pool, self.bad_hosts, self.workflow_db_mgr) - def check_task_jobs(self, workflow, task_pool): + def check_task_jobs(self, task_pool): """Check submission and execution timeout and polling timers. Poll tasks that have timed out and/or have reached next polling time. @@ -184,19 +184,20 @@ def check_task_jobs(self, workflow, task_pool): f"{itask.poll_timer.delay_timeout_as_str()})" ) if poll_tasks: - self.poll_task_jobs(workflow, poll_tasks) + self.poll_task_jobs(poll_tasks) def kill_task_jobs( - self, workflow: str, itasks: 'Iterable[TaskProxy]' + self, itasks: 'Iterable[TaskProxy]' ) -> None: """Issue the command to kill jobs of active tasks.""" self._run_job_cmd( - self.JOBS_KILL, workflow, itasks, + self.JOBS_KILL, + itasks, self._kill_task_jobs_callback, - self._kill_task_jobs_callback_255 + self._kill_task_jobs_callback_255, ) - def poll_task_jobs(self, workflow, itasks, msg=None): + def poll_task_jobs(self, itasks, msg=None): """Poll jobs of specified tasks. This method uses _poll_task_jobs_callback() and @@ -208,7 +209,7 @@ def poll_task_jobs(self, workflow, itasks, msg=None): if msg is not None: LOG.info(msg) self._run_job_cmd( - self.JOBS_POLL, workflow, + self.JOBS_POLL, [ # Don't poll waiting tasks. (This is not only pointless, it # is dangerous because a task waiting to rerun has the @@ -220,7 +221,7 @@ def poll_task_jobs(self, workflow, itasks, msg=None): self._poll_task_jobs_callback_255 ) - def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True): + def prep_submit_task_jobs(self, itasks, check_syntax=True): """Prepare task jobs for submit. Prepare tasks where possible. Ignore tasks that are waiting for host @@ -239,7 +240,8 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True): itask.state_reset(TASK_STATUS_PREPARING) self.data_store_mgr.delta_task_state(itask) prep_task = self._prep_submit_task_job( - workflow, itask, check_syntax=check_syntax) + itask, check_syntax=check_syntax + ) if prep_task: prepared_tasks.append(itask) elif prep_task is False: @@ -248,7 +250,6 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True): def submit_task_jobs( self, - workflow, itasks, curve_auth, client_pub_key_dir, @@ -270,16 +271,18 @@ def submit_task_jobs( """ # submit "simulation/skip" mode tasks, modify "dummy" task configs: itasks, submitted_nonlive_tasks = self.submit_nonlive_task_jobs( - workflow, itasks, run_mode) + itasks, run_mode + ) # submit "live" mode tasks (and "dummy" mode tasks) submitted_live_tasks = self.submit_livelike_task_jobs( - workflow, itasks, curve_auth, client_pub_key_dir) + itasks, curve_auth, client_pub_key_dir + ) return submitted_nonlive_tasks + submitted_live_tasks def submit_livelike_task_jobs( - self, workflow, itasks, curve_auth, client_pub_key_dir + self, itasks, curve_auth, client_pub_key_dir ) -> 'List[TaskProxy]': """Submission for live tasks and dummy tasks. """ @@ -287,8 +290,7 @@ def submit_livelike_task_jobs( # Mapping of platforms to task proxies: auth_itasks: 'Dict[str, List[TaskProxy]]' = {} - prepared_tasks, bad_tasks = self.prep_submit_task_jobs( - workflow, itasks) + prepared_tasks, bad_tasks = self.prep_submit_task_jobs(itasks) # Reset consumed host selection results self.task_remote_mgr.subshell_eval_reset() @@ -351,7 +353,7 @@ def submit_livelike_task_jobs( itask.waiting_on_job_prep = False itask.local_job_file_path = None self._prep_submit_task_job_error( - workflow, itask, '(remote init)', '' + itask, '(remote init)', '' ) # Now that all hosts on all platforms in platform # group selected in task config are exhausted we clear @@ -500,10 +502,14 @@ def submit_livelike_task_jobs( self.JOBS_SUBMIT, '(init %s)' % host, err=init_error, - ret_code=1), - workflow, itask.point, itask.tdef.name) + ret_code=1, + ), + self.workflow, + itask.point, + itask.tdef.name, + ) self._prep_submit_task_job_error( - workflow, itask, '(remote init)', '' + itask, '(remote init)', '' ) continue @@ -528,7 +534,7 @@ def submit_livelike_task_jobs( 'job submission executable paths'] + SYSPATH: cmd.append(f"--path={path}") cmd.append('--') - cmd.append(get_remote_workflow_run_job_dir(workflow)) + cmd.append(get_remote_workflow_run_job_dir(self.workflow)) # Chop itasks into a series of shorter lists if it's very big # to prevent overloading of stdout and stderr pipes. itasks = sorted(itasks, key=lambda itask: itask.identity) @@ -566,8 +572,10 @@ def submit_livelike_task_jobs( stdin_files.append( os.path.expandvars( get_task_job_job_log( - workflow, itask.point, itask.tdef.name, - itask.submit_num + self.workflow, + itask.point, + itask.tdef.name, + itask.submit_num, ) ) ) @@ -595,13 +603,12 @@ def submit_livelike_task_jobs( ), bad_hosts=self.task_remote_mgr.bad_hosts, callback=self._submit_task_jobs_callback, - callback_args=[workflow, itasks_batch], + callback_args=[itasks_batch], callback_255=self._submit_task_jobs_callback_255, ) return done_tasks - @staticmethod - def _create_job_log_path(workflow, itask): + def _create_job_log_path(self, itask): """Create job log directory for a task job, etc. Create local job directory, and NN symbolic link. @@ -611,7 +618,8 @@ def _create_job_log_path(workflow, itask): """ job_file_dir = get_task_job_log( - workflow, itask.point, itask.tdef.name, itask.submit_num) + self.workflow, itask.point, itask.tdef.name, itask.submit_num + ) job_file_dir = os.path.expandvars(job_file_dir) task_log_dir = os.path.dirname(job_file_dir) if itask.submit_num == 1: @@ -646,8 +654,7 @@ def _create_job_log_path(workflow, itask): exc.filename = target raise exc - @staticmethod - def _job_cmd_out_callback(workflow, itask, cmd_ctx, line): + def _job_cmd_out_callback(self, itask, cmd_ctx, line): """Callback on job command STDOUT/STDERR.""" if cmd_ctx.cmd_kwargs.get("host"): host = "(%(host)s) " % cmd_ctx.cmd_kwargs @@ -660,7 +667,7 @@ def _job_cmd_out_callback(workflow, itask, cmd_ctx, line): else: line = "%s %s" % (timestamp, content) job_activity_log = get_task_job_activity_log( - workflow, itask.point, itask.tdef.name) + self.workflow, itask.point, itask.tdef.name) if not line.endswith("\n"): line += "\n" line = host + line @@ -671,29 +678,27 @@ def _job_cmd_out_callback(workflow, itask, cmd_ctx, line): LOG.warning("%s: write failed\n%s" % (job_activity_log, exc)) LOG.warning(f"[{itask}] {host}{line}") - def _kill_task_jobs_callback(self, ctx, workflow, itasks): + def _kill_task_jobs_callback(self, ctx, itasks): """Callback when kill tasks command exits.""" self._manip_task_jobs_callback( ctx, - workflow, itasks, self._kill_task_job_callback, {self.job_runner_mgr.OUT_PREFIX_COMMAND: self._job_cmd_out_callback} ) - def _kill_task_jobs_callback_255(self, ctx, workflow, itasks): + def _kill_task_jobs_callback_255(self, ctx, itasks): """Callback when kill tasks command exits.""" self._manip_task_jobs_callback( ctx, - workflow, itasks, self._kill_task_job_callback_255, {self.job_runner_mgr.OUT_PREFIX_COMMAND: self._job_cmd_out_callback} ) - def _kill_task_job_callback_255(self, workflow, itask, cmd_ctx, line): + def _kill_task_job_callback_255(self, itask, cmd_ctx, line): """Helper for _kill_task_jobs_callback, on one task job.""" with suppress(NoHostsError): # if there is another host to kill on, try again, otherwise fail @@ -701,9 +706,9 @@ def _kill_task_job_callback_255(self, workflow, itask, cmd_ctx, line): itask.platform, bad_hosts=self.task_remote_mgr.bad_hosts ) - self.kill_task_jobs(workflow, [itask]) + self.kill_task_jobs([itask]) - def _kill_task_job_callback(self, workflow, itask, cmd_ctx, line): + def _kill_task_job_callback(self, itask, cmd_ctx, line): """Helper for _kill_task_jobs_callback, on one task job.""" ctx = SubProcContext(self.JOBS_KILL, None) ctx.out = line @@ -716,7 +721,7 @@ def _kill_task_job_callback(self, workflow, itask, cmd_ctx, line): ctx.ret_code = int(ctx.ret_code) if ctx.ret_code: ctx.cmd = cmd_ctx.cmd # print original command on failure - log_task_job_activity(ctx, workflow, itask.point, itask.tdef.name) + log_task_job_activity(ctx, self.workflow, itask.point, itask.tdef.name) log_lvl = WARNING log_msg = 'job killed' if ctx.ret_code: # non-zero exit status @@ -744,8 +749,8 @@ def _kill_task_job_callback(self, workflow, itask, cmd_ctx, line): LOG.log(log_lvl, f"[{itask}] {log_msg}") def _manip_task_jobs_callback( - self, ctx, workflow, itasks, summary_callback, - more_callbacks=None): + self, ctx, itasks, summary_callback, more_callbacks=None + ): """Callback when submit/poll/kill tasks command exits.""" # Swallow SSH 255 (can't contact host) errors unless debugging. if ( @@ -793,7 +798,7 @@ def _manip_task_jobs_callback( if prefix == self.job_runner_mgr.OUT_PREFIX_SUMMARY: del bad_tasks[(point, name, submit_num)] itask = tasks[(point, name, submit_num)] - callback(workflow, itask, ctx, line) + callback(itask, ctx, line) except (LookupError, ValueError) as exc: # (Note this catches KeyError too). LOG.warning( @@ -804,38 +809,36 @@ def _manip_task_jobs_callback( for key, itask in sorted(bad_tasks.items()): line = ( "|".join([ctx.timestamp, os.sep.join(key), "1"]) + "\n") - summary_callback(workflow, itask, ctx, line) + summary_callback(itask, ctx, line) - def _poll_task_jobs_callback(self, ctx, workflow, itasks): + def _poll_task_jobs_callback(self, ctx, itasks): """Callback when poll tasks command exits.""" self._manip_task_jobs_callback( ctx, - workflow, itasks, self._poll_task_job_callback, {self.job_runner_mgr.OUT_PREFIX_MESSAGE: self._poll_task_job_message_callback}) - def _poll_task_jobs_callback_255(self, ctx, workflow, itasks): + def _poll_task_jobs_callback_255(self, ctx, itasks): """Callback when poll tasks command exits.""" self._manip_task_jobs_callback( ctx, - workflow, itasks, self._poll_task_job_callback_255, {self.job_runner_mgr.OUT_PREFIX_MESSAGE: self._poll_task_job_message_callback}) - def _poll_task_job_callback_255(self, workflow, itask, cmd_ctx, line): + def _poll_task_job_callback_255(self, itask, cmd_ctx, line): with suppress(NoHostsError): # if there is another host to poll on, try again, otherwise fail get_host_from_platform( itask.platform, bad_hosts=self.task_remote_mgr.bad_hosts ) - self.poll_task_jobs(workflow, [itask]) + self.poll_task_jobs([itask]) - def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line): + def _poll_task_job_callback(self, itask, cmd_ctx, line): """Helper for _poll_task_jobs_callback, on one task job.""" ctx = SubProcContext(self.JOBS_POLL, None) ctx.out = line @@ -855,7 +858,9 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line): ctx.cmd = cmd_ctx.cmd # print original command on failure return finally: - log_task_job_activity(ctx, workflow, itask.point, itask.tdef.name) + log_task_job_activity( + ctx, self.workflow, itask.point, itask.tdef.name + ) flag = self.task_events_mgr.FLAG_POLLED # Only log at INFO level if manually polling @@ -905,7 +910,7 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line): itask, log_lvl, TASK_STATUS_SUBMITTED, jp_ctx.time_submit_exit, flag) - def _poll_task_job_message_callback(self, workflow, itask, cmd_ctx, line): + def _poll_task_job_message_callback(self, itask, cmd_ctx, line): """Helper for _poll_task_jobs_callback, on message of one task job.""" ctx = SubProcContext(self.JOBS_POLL, None) ctx.out = line @@ -919,10 +924,10 @@ def _poll_task_job_message_callback(self, workflow, itask, cmd_ctx, line): self.task_events_mgr.process_message( itask, severity, message, event_time, self.task_events_mgr.FLAG_POLLED) - log_task_job_activity(ctx, workflow, itask.point, itask.tdef.name) + log_task_job_activity(ctx, self.workflow, itask.point, itask.tdef.name) def _run_job_cmd( - self, cmd_key, workflow, itasks, callback, callback_255 + self, cmd_key, itasks, callback, callback_255 ): """Run job commands, e.g. poll, kill, etc. @@ -964,7 +969,7 @@ def _run_job_cmd( if LOG.isEnabledFor(DEBUG): cmd.append("--debug") cmd.append("--") - cmd.append(get_remote_workflow_run_job_dir(workflow)) + cmd.append(get_remote_workflow_run_job_dir(self.workflow)) job_log_dirs = [] host = 'localhost' @@ -980,7 +985,7 @@ def _run_job_cmd( except NoHostsError: ctx.err = f'No available hosts for {platform["name"]}' LOG.debug(ctx) - callback_255(ctx, workflow, itasks) + callback_255(ctx, itasks) continue else: ctx = SubProcContext(cmd_key, cmd, host=host) @@ -997,7 +1002,7 @@ def _run_job_cmd( ctx, bad_hosts=self.task_remote_mgr.bad_hosts, callback=callback, - callback_args=[workflow, itasks], + callback_args=[itasks], callback_255=callback_255, ) @@ -1028,7 +1033,6 @@ def _set_retry_timers( def submit_nonlive_task_jobs( self: 'TaskJobManager', - workflow: str, itasks: 'List[TaskProxy]', workflow_run_mode: RunMode, ) -> 'Tuple[List[TaskProxy], List[TaskProxy]]': @@ -1074,9 +1078,7 @@ def submit_nonlive_task_jobs( # Submit nonlive tasks, or add live-like (live or dummy) # tasks to list of tasks to put through live submission pipeline. submit_func = itask.run_mode.get_submit_method() - if submit_func and submit_func( - self, itask, rtconfig, workflow, now - ): + if submit_func and submit_func(self, itask, rtconfig, now): # A submit function returns true if this is a nonlive task: self.workflow_db_mgr.put_insert_task_states(itask) nonlive_tasks.append(itask) @@ -1085,36 +1087,32 @@ def submit_nonlive_task_jobs( return lively_tasks, nonlive_tasks - def _submit_task_jobs_callback(self, ctx, workflow, itasks): + def _submit_task_jobs_callback(self, ctx, itasks): """Callback when submit task jobs command exits.""" self._manip_task_jobs_callback( ctx, - workflow, itasks, self._submit_task_job_callback, {self.job_runner_mgr.OUT_PREFIX_COMMAND: self._job_cmd_out_callback} ) - def _submit_task_jobs_callback_255(self, ctx, workflow, itasks): + def _submit_task_jobs_callback_255(self, ctx, itasks): """Callback when submit task jobs command exits.""" self._manip_task_jobs_callback( ctx, - workflow, itasks, self._submit_task_job_callback_255, {self.job_runner_mgr.OUT_PREFIX_COMMAND: self._job_cmd_out_callback} ) - def _submit_task_job_callback_255( - self, workflow, itask, cmd_ctx, line - ): + def _submit_task_job_callback_255(self, itask, cmd_ctx, line): """Helper for _submit_task_jobs_callback, on one task job.""" # send this task back for submission again itask.waiting_on_job_prep = True # (task is in the preparing state) - def _submit_task_job_callback(self, workflow, itask, cmd_ctx, line): + def _submit_task_job_callback(self, itask, cmd_ctx, line): """Helper for _submit_task_jobs_callback, on one task job.""" ctx = SubProcContext(self.JOBS_SUBMIT, None, cmd_ctx.host) ctx.out = line @@ -1129,7 +1127,9 @@ def _submit_task_job_callback(self, workflow, itask, cmd_ctx, line): if ctx.ret_code: ctx.cmd = cmd_ctx.cmd # print original command on failure if cmd_ctx.ret_code != 255: - log_task_job_activity(ctx, workflow, itask.point, itask.tdef.name) + log_task_job_activity( + ctx, self.workflow, itask.point, itask.tdef.name + ) if ctx.ret_code == SubProcPool.RET_CODE_WORKFLOW_STOPPING: return @@ -1149,7 +1149,6 @@ def _submit_task_job_callback(self, workflow, itask, cmd_ctx, line): def _prep_submit_task_job( self, - workflow: str, itask: 'TaskProxy', check_syntax: bool = True ): @@ -1208,10 +1207,10 @@ def _prep_submit_task_job( itask.waiting_on_job_prep = False itask.summary['platforms_used'][itask.submit_num] = '' # Retry delays, needed for the try_num - self._create_job_log_path(workflow, itask) + self._create_job_log_path(itask) self._set_retry_timers(itask, rtconfig) self._prep_submit_task_job_error( - workflow, itask, '(remote host select)', exc + itask, '(remote host select)', exc ) return False else: @@ -1246,17 +1245,19 @@ def _prep_submit_task_job( itask.waiting_on_job_prep = False itask.summary['platforms_used'][itask.submit_num] = '' # Retry delays, needed for the try_num - self._create_job_log_path(workflow, itask) + self._create_job_log_path(itask) if isinstance(exc, NoPlatformsError): # Clear all hosts from all platforms in group from # bad_hosts: self.bad_hosts -= exc.hosts_consumed self._set_retry_timers(itask, rtconfig) self._prep_submit_task_job_error( - workflow, itask, '(no platforms available)', exc) + itask, '(no platforms available)', exc + ) return False self._prep_submit_task_job_error( - workflow, itask, '(platform not defined)', exc) + itask, '(platform not defined)', exc + ) return False else: itask.platform = platform @@ -1265,14 +1266,13 @@ def _prep_submit_task_job( try: job_conf = self._prep_submit_task_job_impl( - workflow, itask, rtconfig, ) itask.jobs.append(job_conf) local_job_file_path = get_task_job_job_log( - workflow, + self.workflow, itask.point, itask.tdef.name, itask.submit_num, @@ -1285,18 +1285,22 @@ def _prep_submit_task_job( except Exception as exc: # Could be a bad command template, IOError, etc itask.waiting_on_job_prep = False - self._prep_submit_task_job_error( - workflow, itask, '(prepare job file)', exc) + self._prep_submit_task_job_error(itask, '(prepare job file)', exc) return False itask.local_job_file_path = local_job_file_path return itask - def _prep_submit_task_job_error(self, workflow, itask, action, exc): + def _prep_submit_task_job_error( + self, + itask: 'TaskProxy', + action: str, + exc: Union[Exception, str], + ) -> None: """Helper for self._prep_submit_task_job. On error.""" log_task_job_activity( SubProcContext(self.JOBS_SUBMIT, action, err=exc, ret_code=1), - workflow, + self.workflow, itask.point, itask.tdef.name, submit_num=itask.submit_num @@ -1328,7 +1332,7 @@ def _prep_submit_task_job_error(self, workflow, itask, action, exc): self.task_events_mgr.process_message( itask, CRITICAL, self.task_events_mgr.EVENT_SUBMIT_FAILED) - def _prep_submit_task_job_impl(self, workflow, itask, rtconfig): + def _prep_submit_task_job_impl(self, itask, rtconfig): """Helper for self._prep_submit_task_job.""" itask.summary['platforms_used'][ @@ -1342,13 +1346,13 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig): ] = self.get_execution_time_limit(rtconfig['execution time limit']) # Location of job file, etc - self._create_job_log_path(workflow, itask) + self._create_job_log_path(itask) job_d = itask.tokens.duplicate(job=str(itask.submit_num)).relative_id job_file_path = get_remote_workflow_run_job_dir( - workflow, job_d, JOB_LOG_JOB) + self.workflow, job_d, JOB_LOG_JOB + ) return self.get_job_conf( - workflow, itask, rtconfig, job_file_path=job_file_path, @@ -1381,7 +1385,6 @@ def get_execution_time_limit( def get_job_conf( self, - workflow, itask, rtconfig, job_file_path=None, @@ -1419,14 +1422,14 @@ def get_job_conf( 'script': rtconfig['script'], 'submit_num': itask.submit_num, 'flow_nums': itask.flow_nums, - 'workflow_name': workflow, + 'workflow_name': self.workflow, 'task_id': itask.identity, 'try_num': itask.get_try_num(), 'uuid_str': self.task_events_mgr.uuid_str, 'work_d': rtconfig['work sub-directory'], } - def get_simulation_job_conf(self, itask, workflow): + def get_simulation_job_conf(self, itask): """Return a job config for a simulated task.""" return { # NOTE: these fields should match _prep_submit_task_job_impl @@ -1450,7 +1453,7 @@ def get_simulation_job_conf(self, itask, workflow): 'script': 'SIMULATION', 'submit_num': itask.submit_num, 'flow_nums': itask.flow_nums, - 'workflow_name': workflow, + 'workflow_name': self.workflow, 'task_id': itask.identity, 'try_num': itask.get_try_num(), 'uuid_str': self.task_events_mgr.uuid_str, diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index cfcd137b453..c5e84158ab4 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -55,7 +55,6 @@ from cylc.flow.wallclock import get_current_time_string from cylc.flow.workflow_files import infer_latest_run_from_id from cylc.flow.workflow_status import StopMode -from cylc.flow.task_state import TASK_STATUS_SUBMITTED from .utils import _rm_if_empty from .utils.flow_tools import ( @@ -424,7 +423,7 @@ def capture_submission(): def _disable_submission(schd: 'Scheduler') -> 'Set[TaskProxy]': submitted_tasks: 'Set[TaskProxy]' = set() - def _submit_task_jobs(_, itasks, *args, **kwargs): + def _submit_task_jobs(itasks, *args, **kwargs): nonlocal submitted_tasks for itask in itasks: itask.state_reset(TASK_STATUS_SUBMITTED) @@ -456,7 +455,7 @@ def _disable_polling(schd: 'Scheduler') -> 'Set[TaskProxy]': polled_tasks: 'Set[TaskProxy]' = set() def run_job_cmd( - _1, _2, itasks, _3, _4=None + _1, itasks, _3, _4=None ): nonlocal polled_tasks polled_tasks.update(itasks) @@ -740,8 +739,8 @@ def capture_live_submissions(capcall, monkeypatch): If you call this fixture from a test, it will return a set of tasks that would have been submitted had this fixture not been used. """ - def fake_submit(self, _workflow, itasks, *_): - self.submit_nonlive_task_jobs(_workflow, itasks, RunMode.SIMULATION) + def fake_submit(self, itasks, *_): + self.submit_nonlive_task_jobs(itasks, RunMode.SIMULATION) for itask in itasks: for status in (TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED): self.task_events_mgr.process_message( @@ -758,13 +757,11 @@ def fake_submit(self, _workflow, itasks, *_): 'cylc.flow.task_job_mgr.TaskJobManager.submit_livelike_task_jobs', fake_submit) - - def get_submissions(): nonlocal submit_live_calls return { itask.identity - for ((_self, _workflow, itasks, *_), _kwargs) in submit_live_calls + for ((_self, itasks, *_), _kwargs) in submit_live_calls for itask in itasks } diff --git a/tests/integration/run_modes/test_mode_overrides.py b/tests/integration/run_modes/test_mode_overrides.py index 25cdf6cf68a..bcc94d9eebf 100644 --- a/tests/integration/run_modes/test_mode_overrides.py +++ b/tests/integration/run_modes/test_mode_overrides.py @@ -96,7 +96,6 @@ async def test_force_trigger_does_not_override_run_mode( # ... but job submission will always change this to the correct mode: schd.task_job_mgr.submit_task_jobs( - schd.workflow, [foo], schd.server.curve_auth, schd.server.client_pub_key_dir) @@ -158,7 +157,6 @@ async def test_run_mode_override_from_broadcast( foo_1001 = schd.pool.get_task(ISO8601Point('1001'), 'foo') schd.task_job_mgr.submit_task_jobs( - schd.workflow, [foo_1000, foo_1001], schd.server.curve_auth, schd.server.client_pub_key_dir) diff --git a/tests/integration/run_modes/test_nonlive.py b/tests/integration/run_modes/test_nonlive.py index 90cefbf7701..e62d250ce99 100644 --- a/tests/integration/run_modes/test_nonlive.py +++ b/tests/integration/run_modes/test_nonlive.py @@ -60,7 +60,6 @@ def submit_and_check_db(): def _inner(schd): # Submit task jobs: schd.task_job_mgr.submit_task_jobs( - schd.workflow, schd.pool.get_tasks(), schd.server.curve_auth, schd.server.client_pub_key_dir @@ -125,7 +124,6 @@ async def test_db_task_states( schd = scheduler(flow(conf)) async with start(schd): schd.task_job_mgr.submit_task_jobs( - schd.workflow, schd.pool.get_tasks(), schd.server.curve_auth, schd.server.client_pub_key_dir @@ -166,7 +164,6 @@ async def test_mean_task_time( # Submit two tasks: schd.task_job_mgr.submit_task_jobs( - schd.workflow, [itask], schd.server.curve_auth, schd.server.client_pub_key_dir diff --git a/tests/integration/run_modes/test_simulation.py b/tests/integration/run_modes/test_simulation.py index 72cbd7e10f1..3d6459bd11f 100644 --- a/tests/integration/run_modes/test_simulation.py +++ b/tests/integration/run_modes/test_simulation.py @@ -63,8 +63,7 @@ def _run_simjob(schd, point, task): itask = schd.pool.get_task(point, task) itask.state.is_queued = False monkeytime(0) - schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], RunMode.SIMULATION) + schd.task_job_mgr.submit_nonlive_task_jobs([itask], RunMode.SIMULATION) monkeytime(itask.mode_settings.timeout + 1) # Run Time Check @@ -171,8 +170,7 @@ def test_fail_once(sim_time_check_setup, itask, point, results, monkeypatch): for i, result in enumerate(results): itask.try_timers['execution-retry'].num = i - schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], RunMode.SIMULATION) + schd.task_job_mgr.submit_nonlive_task_jobs([itask], RunMode.SIMULATION) assert itask.mode_settings.sim_task_fails is result @@ -192,7 +190,8 @@ def test_task_finishes(sim_time_check_setup, monkeytime, caplog): fail_all_1066.state.status = 'running' fail_all_1066.state.is_queued = False schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [fail_all_1066], RunMode.SIMULATION) + [fail_all_1066], RunMode.SIMULATION + ) # For the purpose of the test delete the started time set by # submit_nonlive_task_jobs. @@ -222,7 +221,8 @@ def test_task_sped_up(sim_time_check_setup, monkeytime): # Run the job submission method: monkeytime(0) schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [fast_forward_1066], RunMode.SIMULATION) + [fast_forward_1066], RunMode.SIMULATION + ) fast_forward_1066.state.is_queued = False result = sim_time_check(schd.task_events_mgr, [fast_forward_1066], '') @@ -273,7 +273,8 @@ async def test_settings_restart(monkeytime, flow, scheduler, start): og_timeouts = {} for itask in schd.pool.get_tasks(): schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], RunMode.SIMULATION) + [itask], RunMode.SIMULATION + ) og_timeouts[itask.identity] = itask.mode_settings.timeout @@ -391,8 +392,7 @@ async def test_settings_broadcast( itask.state.is_queued = False # Submit the first - the sim task will fail: - schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], RunMode.SIMULATION) + schd.task_job_mgr.submit_nonlive_task_jobs([itask], RunMode.SIMULATION) assert itask.mode_settings.sim_task_fails is True # Let task finish. @@ -410,14 +410,12 @@ async def test_settings_broadcast( 'simulation': {'fail cycle points': ''} }]) # Submit again - result is different: - schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], RunMode.SIMULATION) + schd.task_job_mgr.submit_nonlive_task_jobs([itask], RunMode.SIMULATION) assert itask.mode_settings.sim_task_fails is False # Assert Clearing the broadcast works schd.broadcast_mgr.clear_broadcast() - schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], RunMode.SIMULATION) + schd.task_job_mgr.submit_nonlive_task_jobs([itask], RunMode.SIMULATION) assert itask.mode_settings.sim_task_fails is True # Assert that list of broadcasts doesn't change if we submit @@ -427,8 +425,7 @@ async def test_settings_broadcast( ['1066'], ['one'], [{ 'simulation': {'fail cycle points': 'higadfuhasgiurguj'} }]) - schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], RunMode.SIMULATION) + schd.task_job_mgr.submit_nonlive_task_jobs([itask], RunMode.SIMULATION) assert ( 'Invalid ISO 8601 date representation: higadfuhasgiurguj' in log.messages[-1]) @@ -441,8 +438,7 @@ async def test_settings_broadcast( ['1066'], ['one'], [{ 'simulation': {'fail cycle points': '1'} }]) - schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], RunMode.SIMULATION) + schd.task_job_mgr.submit_nonlive_task_jobs([itask], RunMode.SIMULATION) assert ( 'Invalid ISO 8601 date representation: 1' in log.messages[-1]) @@ -453,8 +449,7 @@ async def test_settings_broadcast( 'simulation': {'fail cycle points': '1945, 1977, 1066'}, 'execution retry delays': '3*PT2S' }]) - schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], RunMode.SIMULATION) + schd.task_job_mgr.submit_nonlive_task_jobs([itask], RunMode.SIMULATION) assert itask.mode_settings.sim_task_fails is True assert itask.try_timers['execution-retry'].delays == [2.0, 2.0, 2.0] # n.b. rtconfig should remain unchanged, lest we cancel broadcasts: diff --git a/tests/integration/run_modes/test_skip.py b/tests/integration/run_modes/test_skip.py index 5c9c31a6d56..29d34de7a75 100644 --- a/tests/integration/run_modes/test_skip.py +++ b/tests/integration/run_modes/test_skip.py @@ -52,7 +52,6 @@ async def test_settings_override_from_broadcast( foo, = schd.pool.get_tasks() schd.task_job_mgr.submit_task_jobs( - schd.workflow, schd.pool.get_tasks(), schd.server.curve_auth, schd.server.client_pub_key_dir @@ -216,7 +215,6 @@ async def test_prereqs_marked_satisfied_by_skip_mode( async with start(schd): foo = schd.pool.get_task(IntegerPoint(1), 'foo') schd.task_job_mgr.submit_task_jobs( - schd.workflow, [foo], schd.server.curve_auth, schd.server.client_pub_key_dir, @@ -241,7 +239,6 @@ async def test_outputs_can_be_changed(one_conf, flow, start, scheduler, validate ], ) schd.task_job_mgr.submit_task_jobs( - schd.workflow, schd.pool.get_tasks(), None, None @@ -252,7 +249,6 @@ async def test_outputs_can_be_changed(one_conf, flow, start, scheduler, validate ['1'], ['one'], [{'skip': {'outputs': 'succeeded'}}] ) schd.task_job_mgr.submit_task_jobs( - schd.workflow, schd.pool.get_tasks(), None, None diff --git a/tests/integration/test_job_runner_mgr.py b/tests/integration/test_job_runner_mgr.py index 35858ebbd49..6fada42c977 100644 --- a/tests/integration/test_job_runner_mgr.py +++ b/tests/integration/test_job_runner_mgr.py @@ -73,7 +73,6 @@ async def test_kill_error(one, start, test_dir, capsys, log_filter): out=out, err=err, ), - one.workflow, [itask], ) diff --git a/tests/integration/test_platforms.py b/tests/integration/test_platforms.py index f8187227ceb..7cbd4b2d350 100644 --- a/tests/integration/test_platforms.py +++ b/tests/integration/test_platforms.py @@ -23,7 +23,7 @@ async def test_prep_submit_task_tries_multiple_platforms( task platform setting matches a group, and that after all platforms have been tried that the hosts matching that platform group are cleared. - + See https://github.com/cylc/cylc-flow/pull/6109 """ global_conf = ''' @@ -47,7 +47,7 @@ async def test_prep_submit_task_tries_multiple_platforms( itask.submit_num = 1 # simulate failed attempts to contact the job hosts schd.task_job_mgr.bad_hosts = {'broken', 'broken2'} - res = schd.task_job_mgr._prep_submit_task_job(schd.workflow, itask) + res = schd.task_job_mgr._prep_submit_task_job(itask) assert res is False # ensure the bad hosts have been cleared assert not schd.task_job_mgr.bad_hosts diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py index ac1fb2f9344..0c1ed5a7253 100644 --- a/tests/integration/test_task_events_mgr.py +++ b/tests/integration/test_task_events_mgr.py @@ -152,7 +152,6 @@ async def test__always_insert_task_job( schd.bad_hosts = {'no-such-host-1', 'no-such-host-2'} async with start(schd): schd.task_job_mgr.submit_task_jobs( - schd.workflow, schd.pool.get_tasks(), schd.server.curve_auth, schd.server.client_pub_key_dir, diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py index b1cf1347071..5bdb569438d 100644 --- a/tests/integration/test_task_job_mgr.py +++ b/tests/integration/test_task_job_mgr.py @@ -84,10 +84,7 @@ async def test_run_job_cmd_no_hosts_error( schd.task_job_mgr.bad_hosts.add('no-host-platform') # polling the task should not result in an error... - schd.task_job_mgr.poll_task_jobs( - schd.workflow, - schd.pool.get_tasks() - ) + schd.task_job_mgr.poll_task_jobs(schd.pool.get_tasks()) # ...but the failure should be logged assert log_filter( @@ -96,10 +93,7 @@ async def test_run_job_cmd_no_hosts_error( log.clear() # killing the task should not result in an error... - schd.task_job_mgr.kill_task_jobs( - schd.workflow, - schd.pool.get_tasks() - ) + schd.task_job_mgr.kill_task_jobs(schd.pool.get_tasks()) # ...but the failure should be logged assert log_filter( @@ -119,7 +113,6 @@ async def test__run_job_cmd_logs_platform_lookup_fail( from types import SimpleNamespace schd.task_job_mgr._run_job_cmd( schd.task_job_mgr.JOBS_POLL, - 'foo', [SimpleNamespace(platform={'name': 'culdee fell summit'})], None, None @@ -163,14 +156,16 @@ async def test__prep_submit_task_job_impl_handles_execution_time_limit( # in the summary state. with suppress(FileExistsError): schd.task_job_mgr._prep_submit_task_job_impl( - schd.workflow, task_a, task_a.tdef.rtconfig) + task_a, task_a.tdef.rtconfig + ) assert task_a.summary['execution_time_limit'] == 5.0 # If we delete the etl it gets deleted in the summary: task_a.tdef.rtconfig['execution time limit'] = None with suppress(FileExistsError): schd.task_job_mgr._prep_submit_task_job_impl( - schd.workflow, task_a, task_a.tdef.rtconfig) + task_a, task_a.tdef.rtconfig + ) assert not task_a.summary.get('execution_time_limit', '') # put everything back and test broadcast too. @@ -181,8 +176,7 @@ async def test__prep_submit_task_job_impl_handles_execution_time_limit( with suppress(FileExistsError): # We run a higher level function here to ensure # that the broadcast is applied. - schd.task_job_mgr._prep_submit_task_job( - schd.workflow, task_a) + schd.task_job_mgr._prep_submit_task_job(task_a) assert not task_a.summary.get('execution_time_limit', '') @@ -224,7 +218,6 @@ async def test_broadcast_platform_change( # Attempt job submission: schd.task_job_mgr.submit_task_jobs( - schd.workflow, schd.pool.get_tasks(), schd.server.curve_auth, schd.server.client_pub_key_dir)