Skip to content

Commit

Permalink
Merge pull request #6555 from MetRonnie/tidy
Browse files Browse the repository at this point in the history
Tidy `TaskJobManager`: remove unecessary `workflow` arg from many methods
  • Loading branch information
hjoliver authored Jan 14, 2025
2 parents 690e610 + e78ea6b commit 931092b
Show file tree
Hide file tree
Showing 16 changed files with 126 additions and 155 deletions.
2 changes: 1 addition & 1 deletion cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]):
if schd.get_run_mode() == RunMode.SIMULATION:
yield 0
itasks, _, bad_items = schd.pool.filter_task_proxies(tasks)
schd.task_job_mgr.poll_task_jobs(schd.workflow, itasks)
schd.task_job_mgr.poll_task_jobs(itasks)
yield len(bad_items)


Expand Down
2 changes: 0 additions & 2 deletions cylc/flow/run_modes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
'TaskProxy',
# the task's runtime config (with broadcasts applied)
Dict[str, Any],
# the workflow ID
str,
# the current time as (float_unix_time, str_ISO8601)
Tuple[float, str]
],
Expand Down
3 changes: 1 addition & 2 deletions cylc/flow/run_modes/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def submit_task_job(
task_job_mgr: 'TaskJobManager',
itask: 'TaskProxy',
rtconfig: Dict[str, Any],
workflow: str,
now: Tuple[float, str]
) -> 'Literal[False]':
"""Submit a task in dummy mode.
Expand All @@ -77,7 +76,7 @@ def submit_task_job(
itask.summary[task_job_mgr.KEY_EXECUTE_TIME_LIMIT] = (
itask.mode_settings.simulated_run_length)
itask.jobs.append(
task_job_mgr.get_simulation_job_conf(itask, workflow))
task_job_mgr.get_simulation_job_conf(itask))
task_job_mgr.workflow_db_mgr.put_insert_task_jobs(
itask, {
'time_submit': now[1],
Expand Down
3 changes: 1 addition & 2 deletions cylc/flow/run_modes/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def submit_task_job(
task_job_mgr: 'TaskJobManager',
itask: 'TaskProxy',
rtconfig: Dict[str, Any],
workflow: str,
now: Tuple[float, str]
) -> 'Literal[True]':
"""Submit a task in simulation mode.
Expand Down Expand Up @@ -84,7 +83,7 @@ def submit_task_job(
itask.mode_settings.simulated_run_length
)
itask.jobs.append(
task_job_mgr.get_simulation_job_conf(itask, workflow)
task_job_mgr.get_simulation_job_conf(itask)
)
task_job_mgr.task_events_mgr.process_message(
itask, INFO, TASK_OUTPUT_SUBMITTED,
Expand Down
3 changes: 1 addition & 2 deletions cylc/flow/run_modes/skip.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def submit_task_job(
task_job_mgr: 'TaskJobManager',
itask: 'TaskProxy',
rtconfig: Dict,
_workflow: str,
now: Tuple[float, str]
) -> 'Literal[True]':
"""Submit a task in skip mode.
Expand All @@ -65,7 +64,7 @@ def submit_task_job(
}
itask.summary['job_runner_name'] = RunMode.SKIP.value
itask.jobs.append(
task_job_mgr.get_simulation_job_conf(itask, _workflow)
task_job_mgr.get_simulation_job_conf(itask)
)
itask.run_mode = RunMode.SKIP
task_job_mgr.workflow_db_mgr.put_insert_task_jobs(
Expand Down
7 changes: 3 additions & 4 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ def process_queued_task_messages(self) -> None:
if should_poll:
to_poll_tasks.append(itask)
if to_poll_tasks:
self.task_job_mgr.poll_task_jobs(self.workflow, to_poll_tasks)
self.task_job_mgr.poll_task_jobs(to_poll_tasks)

# Remaining unprocessed messages have no corresponding task proxy.
# For example, if I manually set a running task to succeeded, the
Expand Down Expand Up @@ -1096,7 +1096,7 @@ def kill_tasks(
f"{', '.join(sorted(t.identity for t in unkillable))}"
)
if not jobless:
self.task_job_mgr.kill_task_jobs(self.workflow, to_kill)
self.task_job_mgr.kill_task_jobs(to_kill)

return len(unkillable)

Expand Down Expand Up @@ -1536,7 +1536,6 @@ def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool:
log = LOG.info

for itask in self.task_job_mgr.submit_task_jobs(
self.workflow,
itasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
Expand Down Expand Up @@ -1592,7 +1591,7 @@ def timeout_check(self):
self.check_workflow_timers()
# check submission and execution timeout and polling timers
if self.get_run_mode() != RunMode.SIMULATION:
self.task_job_mgr.check_task_jobs(self.workflow, self.pool)
self.task_job_mgr.check_task_jobs(self.pool)

async def workflow_shutdown(self):
"""Determines if the workflow can be shutdown yet."""
Expand Down
Loading

0 comments on commit 931092b

Please sign in to comment.