From 97fa1e58f97627777e051844630408b12a598b2a Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Tue, 27 Jul 2021 18:46:59 -0700 Subject: [PATCH 01/14] Add proper shutdown support for monkeys The previous way monkeys were stopped involved races between the ongoing loop and the stop code and no clean shutdown, just a signal from asyncio. Add a control queue to each monkey business and use it to signal a clean shutdown. Check for control messages at every pause during monkey execution. Extensively refactor the inheritance hierarchy for monkeys. Changes include: - NotebookRunner now depends on JupyterPythonLoop - Shutting down the lab between executions is now optional - Settle time is applied to all Jupyter monkey business - Business classes document which methods to override - NotebookRunner uses max_executions instead of its own config - The Business base class handles more of the core logic - There is now an explicit shutdown method to do any cleanup - All Jupyter business deletes the lab when it starts - All Jupyter business deletes the lab when it shuts down - All Jupyter business closes the JupyterClient during shutdown --- src/mobu/business/base.py | 86 ++++++++++++++-- src/mobu/business/jupyterjitterloginloop.py | 12 ++- src/mobu/business/jupyterloginloop.py | 107 +++++++++----------- src/mobu/business/jupyterpythonloop.py | 38 ++++--- src/mobu/business/notebookrunner.py | 95 ++++++----------- src/mobu/business/querymonkey.py | 29 ++---- src/mobu/dependencies/manager.py | 4 +- src/mobu/models/business.py | 70 ++++++++----- src/mobu/monkey.py | 34 ++----- tests/autostart_test.py | 2 +- tests/business/jupyterloginloop_test.py | 50 ++++++++- tests/business/jupyterpythonloop_test.py | 56 +++++++--- tests/conftest.py | 18 ++-- tests/handlers/flock_test.py | 16 +-- tests/monkeyflocker_test.py | 2 +- tests/support/jupyter.py | 11 +- 16 files changed, 377 insertions(+), 253 deletions(-) diff --git a/src/mobu/business/base.py b/src/mobu/business/base.py index 8d2bb0a5..cd3bc177 100644 --- a/src/mobu/business/base.py +++ b/src/mobu/business/base.py @@ -3,6 +3,8 @@ from __future__ import annotations import asyncio +from asyncio import Queue, QueueEmpty, TimeoutError +from enum import Enum from typing import TYPE_CHECKING from ..models.business import BusinessData @@ -17,9 +19,29 @@ __all__ = ["Business"] +class BusinessCommand(Enum): + """Commands sent over the internal control queue.""" + + STOP = "STOP" + + class Business: """Base class for monkey business (one type of repeated operation). + The basic flow for a monkey business is as follows: + + - Run ``startup`` + - In a loop, run ``execute`` followed by ``idle`` until told to stop + - When told to stop, run ``shutdown`` + + Subclasses should override ``startup``, ``execute``, and ``shutdown`` to + add appropriate behavior. ``idle`` by default waits for ``idle_time``, + which generally does not need to be overridden. + + All delays should be done by calling ``pause``, and the caller should + check ``self.stopping`` and exit any loops if it is `True` after calling + ``pause``. + Parameters ---------- logger : `structlog.BoundLogger` @@ -42,17 +64,69 @@ def __init__( self.success_count = 0 self.failure_count = 0 self.timings = Timings() + self.control: Queue[BusinessCommand] = Queue() + self.stopping = False async def run(self) -> None: - while True: - self.logger.info("Idling...") - with self.timings.start("idle"): - await asyncio.sleep(5) - self.success_count += 1 + """The core business logic, run in a background task.""" + self.logger.info("Starting up...") + await self.startup() - async def stop(self) -> None: + while not self.stopping: + self.logger.info("Starting next iteration") + try: + await self.execute() + self.success_count += 1 + except Exception: + self.failure_count += 1 + raise + await self.idle() + + self.logger.info("Shutting down...") + await self.shutdown() + + # Tell the control channel we've processed the stop command. + self.control.task_done() + + async def startup(self) -> None: + """Run before the start of the first iteration and then not again.""" + pass + + async def execute(self) -> None: + """The business done in each loop.""" pass + async def idle(self) -> None: + """The idle pause at the end of each loop.""" + self.logger.info("Idling...") + with self.timings.start("idle"): + await self.pause(self.config.idle_time) + + async def shutdown(self) -> None: + """Any cleanup to do before exiting after stopping.""" + pass + + async def stop(self) -> None: + """Tell the running background task to stop and wait for that.""" + await self.control.put(BusinessCommand.STOP) + await self.control.join() + self.logger.info("Stopped") + + async def pause(self, seconds: float) -> None: + """Pause for up to the number of seconds, handling commands.""" + if self.stopping: + return + try: + if seconds: + command = await asyncio.wait_for(self.control.get(), seconds) + else: + command = self.control.get_nowait() + except (TimeoutError, QueueEmpty): + return + else: + if command == BusinessCommand.STOP: + self.stopping = True + def dump(self) -> BusinessData: return BusinessData( name=type(self).__name__, diff --git a/src/mobu/business/jupyterjitterloginloop.py b/src/mobu/business/jupyterjitterloginloop.py index 5239e1c1..3e1197ce 100644 --- a/src/mobu/business/jupyterjitterloginloop.py +++ b/src/mobu/business/jupyterjitterloginloop.py @@ -7,7 +7,6 @@ from __future__ import annotations -import asyncio import random from .jupyterloginloop import JupyterLoginLoop @@ -20,14 +19,17 @@ class JupyterJitterLoginLoop(JupyterLoginLoop): async def startup(self) -> None: with self.timings.start("pre_login_delay"): - await asyncio.sleep(random.uniform(0, 30)) + await self.pause(random.uniform(0, 30)) + if self.stopping: + return await super().startup() - await asyncio.sleep(random.uniform(10, 30)) + await self.pause(random.uniform(10, 30)) async def lab_business(self) -> None: with self.timings.start("lab_wait"): - await asyncio.sleep(1200 + random.uniform(0, 600)) + await self.pause(1200 + random.uniform(0, 600)) async def idle(self) -> None: + self.logger.info("Idling...") with self.timings.start("idle"): - await asyncio.sleep(30 + random.uniform(0, 60)) + await self.pause(30 + random.uniform(0, 60)) diff --git a/src/mobu/business/jupyterloginloop.py b/src/mobu/business/jupyterloginloop.py index d10e1c35..082eaeba 100644 --- a/src/mobu/business/jupyterloginloop.py +++ b/src/mobu/business/jupyterloginloop.py @@ -6,7 +6,6 @@ from __future__ import annotations -import asyncio from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING @@ -25,6 +24,15 @@ class JupyterLoginLoop(Business): """Business that logs on to the hub, creates a lab, and deletes it. + This class modifies the core `~mobu.business.base.Business` loop by + providing the overall ``execute`` framework and defualt ``startup`` and + ``shutdown`` methods. It will log on to JupyterHub, ensure no lab + currently exists, create a lab, run ``lab_business``, and then shut down + the lab before starting another iteration. + + Subclasses should override ``lab_business`` to do whatever they want to do + inside a lab. The default behavior just waits for ``login_idle_time``. + Once this business has been stopped, it cannot be started again (the `aiohttp.ClientSession` will be closed), and the instance should be dropped after retrieving any status information. @@ -40,85 +48,64 @@ def __init__( self._client = JupyterClient(user, logger, business_config) self._last_login = datetime.fromtimestamp(0, tz=timezone.utc) - async def run(self) -> None: - self.logger.info("Starting up...") - await self.startup() - while True: - await self.reauth_if_needed() - self.logger.info("Starting next iteration") - try: - await self.ensure_lab() - await self.lab_business() - await self.delete_lab() - self.success_count += 1 - except Exception: - self.failure_count += 1 - raise - await self.lab_idle() - async def startup(self) -> None: - """Run before the start of the first iteration and then not again.""" await self.hub_login() + await self.initial_delete_lab() + + async def execute(self) -> None: + """The work done in each iteration of the loop.""" + await self.reauth_if_needed() + await self.ensure_lab() + await self.lab_settle() + if self.stopping: + return + await self.lab_business() + if self.config.delete_lab: + await self.delete_lab() + + async def shutdown(self) -> None: + await self.delete_lab() + await self._client.close() async def hub_login(self) -> None: + self.logger.info("Logging in to hub") with self.timings.start("hub_login"): await self._client.hub_login() - self._last_login = self._now() + self._last_login = datetime.now(tz=timezone.utc) async def ensure_lab(self) -> None: with self.timings.start("ensure_lab"): await self._client.ensure_lab() - self.logger.info("Lab created.") + self.logger.info("Lab created") + + async def lab_settle(self) -> None: + with self.timings.start("lab_settle"): + await self.pause(self.config.settle_time) async def delete_lab(self) -> None: - self.logger.info("Deleting lab.") + self.logger.info("Deleting lab") with self.timings.start("delete_lab"): await self._client.delete_lab() - self.logger.info("Lab successfully deleted.") + self.logger.info("Lab successfully deleted") + + async def initial_delete_lab(self) -> None: + self.logger.info("Deleting any existing lab") + with self.timings.start("initial_delete_lab"): + await self._client.delete_lab() async def lab_business(self) -> None: """Do whatever business we want to do inside a lab. - Placeholder function intended to be overridden by subclasses. + Placeholder function intended to be overridden by subclasses. The + default behavior is to wait a minute and then shut the lab down + again. """ with self.timings.start("lab_wait"): - await asyncio.sleep(5) - - async def lab_idle(self) -> None: - """Executed at the end of each iteration for a given lab. - - Intended to be overridden by subclasses if they want different idle - behavior. - """ - delay = self.config.lab_idle_time - if delay > 0: - with self.timings.start("idle"): - await asyncio.sleep(delay) - - async def execution_idle(self) -> None: - """Executed between each unit of work execution (usually a Lab - cell). - """ - delay = self.config.execution_idle_time - if delay > 0: - with self.timings.start("execution_idle"): - await asyncio.sleep(self.config.execution_idle_time) - - def _now(self) -> datetime: - return datetime.now(timezone.utc) + await self.pause(self.config.login_idle_time) async def reauth_if_needed(self) -> None: - now = self._now() - elapsed = now - self._last_login + elapsed = datetime.now(tz=timezone.utc) - self._last_login if elapsed > timedelta(self.config.reauth_interval): - await self.hub_reauth() - - async def hub_reauth(self) -> None: - self.logger.info("Reauthenticating to Hub") - with self.timings.start("hub_reauth"): - await self._client.hub_login() - - async def stop(self) -> None: - with self.timings.start("delete_lab_on_stop"): - await self._client.delete_lab() - await self._client.close() + self.logger.info("Reauthenticating to Hub") + with self.timings.start("hub_reauth"): + await self._client.hub_login() diff --git a/src/mobu/business/jupyterpythonloop.py b/src/mobu/business/jupyterpythonloop.py index 749816da..ac7d088f 100644 --- a/src/mobu/business/jupyterpythonloop.py +++ b/src/mobu/business/jupyterpythonloop.py @@ -11,14 +11,20 @@ class JupyterPythonLoop(JupyterLoginLoop): - """Run simple Python code in a loop inside a lab kernel.""" + """Run simple Python code in a loop inside a lab kernel. + + This can be used as a base class for other JupyterLab code execution + monkey business. Override ``execute_code`` to change what code is + executed. When doing so, be sure to call ``execute_idle`` between each + code execution and check ``self.stopping`` after it returns, exiting any + loops if ``self.stopping`` is true. + """ async def lab_business(self) -> None: - await self.reauth_if_needed() + if self.stopping: + return session = await self.create_session() - for count in range(self.config.max_executions): - await self.execute_code(session, self.config.code) - await self.execution_idle() + await self.execute_code(session) await self.delete_session(session) async def create_session(self) -> JupyterLabSession: @@ -27,13 +33,21 @@ async def create_session(self) -> JupyterLabSession: session = await self._client.create_labsession() return session - async def execute_code( - self, session: JupyterLabSession, code: str - ) -> None: - with self.timings.start("execute_code", {"code": code}) as sw: - reply = await self._client.run_python(session, code) - sw.annotation["result"] = reply - self.logger.info(f"{code} -> {reply}") + async def execute_code(self, session: JupyterLabSession) -> None: + code = self.config.code + for count in range(self.config.max_executions): + with self.timings.start("execute_code", {"code": code}) as sw: + reply = await self._client.run_python(session, code) + sw.annotation["result"] = reply + self.logger.info(f"{code} -> {reply}") + await self.execution_idle() + if self.stopping: + break + + async def execution_idle(self) -> None: + """Executed between each unit of work execution.""" + with self.timings.start("execution_idle"): + await self.pause(self.config.execution_idle_time) async def delete_session(self, session: JupyterLabSession) -> None: self.logger.info("delete_session") diff --git a/src/mobu/business/notebookrunner.py b/src/mobu/business/notebookrunner.py index 2a13322c..20bd37e5 100644 --- a/src/mobu/business/notebookrunner.py +++ b/src/mobu/business/notebookrunner.py @@ -6,7 +6,6 @@ from __future__ import annotations -import asyncio import json import os from pathlib import Path @@ -17,7 +16,7 @@ from ..jupyterclient import JupyterLabSession, NotebookException from ..models.business import BusinessData -from .jupyterloginloop import JupyterLoginLoop +from .jupyterpythonloop import JupyterPythonLoop if TYPE_CHECKING: from typing import Any, Dict, Iterator, List, Optional @@ -30,7 +29,7 @@ __all__ = ["NotebookRunner"] -class NotebookRunner(JupyterLoginLoop): +class NotebookRunner(JupyterPythonLoop): """Start a Jupyter lab and run a sequence of notebooks.""" def __init__( @@ -47,31 +46,6 @@ def __init__( self._repo: Optional[git.Repo] = None self._notebook_iterator: Optional[Iterator[os.DirEntry]] = None - async def run(self) -> None: - self.logger.info("Starting up...") - await self.startup() - while True: - self.logger.info("Starting next iteration") - try: - await self.lab_business() - self.success_count += 1 - except NotebookException as e: - running_code = self.running_code - notebook_name = "no notebook" - if self.notebook: - self._failed_notebooks.append(self.notebook.name) - notebook_name = self.notebook.name - self.logger.error(f"Error running notebook: {notebook_name}") - self.running_code = None - self.failure_count += 1 - raise NotebookException( - f"Running {notebook_name}: '" - f"```{running_code}``` generated: ```{e}```" - ) - except Exception: - self.failure_count += 1 - raise - async def startup(self) -> None: if not self._repo: self.clone_repo() @@ -87,38 +61,28 @@ def clone_repo(self) -> None: with self.timings.start("clone_repo"): self._repo = git.Repo.clone_from(url, path, branch=branch) - async def initial_delete_lab(self) -> None: - with self.timings.start("initial_delete_lab"): - await self._client.delete_lab() - - async def lab_business(self) -> None: + async def execute_code(self, session: JupyterLabSession) -> None: self._next_notebook() assert self.notebook - - await self.ensure_lab() - await self.lab_settle() - session = await self.create_session() - self.logger.info(f"Starting notebook: {self.notebook.name}") cells = self.read_notebook(self.notebook.name, self.notebook.path) - for count in range(self.config.notebook_iterations): - iteration = f"{count + 1}/{self.config.notebook_iterations}" + for count in range(self.config.max_executions): + iteration = f"{count + 1}/{self.config.max_executions}" msg = f"Notebook '{self.notebook.name}' iteration {iteration}" self.logger.info(msg) await self.reauth_if_needed() for cell in cells: self.running_code = "".join(cell["source"]) - await self.execute_code(session, self.running_code) + await self.execute_cell(session, self.running_code) + self.running_code = None await self.execution_idle() + if self.stopping: + break - self.running_code = None - await self.delete_session(session) - self.logger.info(f"Success running notebook: {self.notebook.name}") - - async def lab_settle(self) -> None: - with self.timings.start("lab_settle"): - await asyncio.sleep(self.config.settle_time) + if self.stopping: + break + self.logger.info(f"Success running notebook: {self.notebook.name}") def read_notebook(self, name: str, path: str) -> List[Dict[str, Any]]: with self.timings.start(f"read_notebook:{name}"): @@ -127,27 +91,36 @@ def read_notebook(self, name: str, path: str) -> List[Dict[str, Any]]: return [c for c in cells if c["cell_type"] == "code"] async def create_session(self) -> JupyterLabSession: + """Override create_session to add the notebook name.""" self.logger.info("create_session") notebook_name = self.notebook.name if self.notebook else None with self.timings.start("create_session"): session = await self._client.create_labsession( - notebook_name=notebook_name, + notebook_name=notebook_name ) return session - async def execute_code( + async def execute_cell( self, session: JupyterLabSession, code: str ) -> None: self.logger.info("Executing:\n%s\n", code) - with self.timings.start("run_code", {"code": code}) as sw: - reply = await self._client.run_python(session, code) - sw.annotation["result"] = reply - self.logger.info(f"Result:\n{reply}\n") - - async def delete_session(self, session: JupyterLabSession) -> None: - self.logger.info(f"Deleting session {session}") - with self.timings.start("delete_session"): - await self._client.delete_labsession(session) + try: + with self.timings.start("run_code", {"code": code}) as sw: + reply = await self._client.run_python(session, code) + sw.annotation["result"] = reply + self.logger.info(f"Result:\n{reply}\n") + except NotebookException as e: + running_code = self.running_code + notebook_name = "no notebook" + if self.notebook: + self._failed_notebooks.append(self.notebook.name) + notebook_name = self.notebook.name + self.logger.error(f"Error running notebook: {notebook_name}") + self.running_code = None + raise NotebookException( + f"Running {notebook_name}: '" + f"```{running_code}``` generated: ```{e}```" + ) def dump(self) -> BusinessData: data = super().dump() @@ -162,8 +135,6 @@ def _next_notebook(self) -> None: while not self.notebook.path.endswith(".ipynb"): self.notebook = next(self._notebook_iterator) except StopIteration: - self.logger.info( - "Done with this cycle of notebooks, recreating lab." - ) + self.logger.info("Done with this cycle of notebooks") self._notebook_iterator = os.scandir(self._repo_dir.name) self._next_notebook() diff --git a/src/mobu/business/querymonkey.py b/src/mobu/business/querymonkey.py index a083f49a..c7612244 100644 --- a/src/mobu/business/querymonkey.py +++ b/src/mobu/business/querymonkey.py @@ -69,22 +69,15 @@ def _make_client(token: str) -> pyvo.dal.TAPService: return pyvo.dal.TAPService(tap_url, auth) - async def run(self) -> None: - self.logger.info("Starting up...") + async def startup(self) -> None: templates = self._env.list_templates() self.logger.info("Query templates to choose from: %s", templates) - while True: - template_name = random.choice(self._env.list_templates()) - template = self._env.get_template(template_name) - query = template.render(generate_parameters()) - try: - await self.run_query(query) - self.success_count += 1 - except Exception: - self.failure_count += 1 - raise - await asyncio.sleep(60) + async def execute(self) -> None: + template_name = random.choice(self._env.list_templates()) + template = self._env.get_template(template_name) + query = template.render(generate_parameters()) + await self.run_query(query) async def run_query(self, query: str) -> None: self.logger.info("Running: %s", query) @@ -93,13 +86,3 @@ async def run_query(self, query: str) -> None: await loop.run_in_executor(None, self._client.search, query) elapsed = sw.elapsed.total_seconds() self.logger.info(f"Query finished after {elapsed} seconds") - - async def stop(self) -> None: - # There's nothing to do since we use synchronous queries. If we use - # async queries, this should do: - # - # loop = asyncio.get_event_loop() - # with self.timings.start("delete_tap_client_on_stop"): - # await loop.run_in_executor(None, self._client.abort) - # await loop.run_in_executor(None, self._client.delete) - pass diff --git a/src/mobu/dependencies/manager.py b/src/mobu/dependencies/manager.py index 317a9e3b..5f2a82f9 100644 --- a/src/mobu/dependencies/manager.py +++ b/src/mobu/dependencies/manager.py @@ -30,13 +30,15 @@ async def init(self) -> None: self._session = ClientSession() async def cleanup(self) -> None: + for flock in self._flocks.values(): + await flock.stop() + self._flocks.clear() if self._scheduler is not None: await self._scheduler.close() self._scheduler = None if self._session: await self._session.close() self._session = None - self._flocks.clear() async def start_flock(self, flock_config: FlockConfig) -> Flock: if self._scheduler is None or not self._session: diff --git a/src/mobu/models/business.py b/src/mobu/models/business.py index fe867b1f..969bae30 100644 --- a/src/mobu/models/business.py +++ b/src/mobu/models/business.py @@ -29,16 +29,6 @@ class BusinessConfig(BaseModel): default_factory=dict, title="Values to POST to the spawn options form" ) - notebook_iterations: int = Field( - 1, - title="How many iterations through the notebooks", - description=( - "After each iteration, the kernel is restarted." - " Only used by the NotebookRunner." - ), - example=10, - ) - code: str = Field( "print(2+2)", title="Python code to execute", @@ -61,39 +51,73 @@ class BusinessConfig(BaseModel): settle_time: int = Field( 10, title="How long to wait after lab creation in seconds", - description="Only used by the NotebookRunner", + description=( + "Only used by the NotebookRunner. It will wait for this long" + " after lab creation before trying to create a session." + ), example=10, ) - lab_idle_time: int = Field( - 20, - title="How long to wait at end of lab loop in seconds", - description="Used by JupyterLoginLoop", - example=20, + idle_time: int = Field( + 60, + title="How long to wait between business executions", + description=( + "AFter each loop executing monkey business, the monkey will" + " pause for this long in seconds" + ), + example=60, + ) + + login_idle_time: int = Field( + 60, + title="Time to pause after spawning lab", + description=( + "Only used by JupyterLoginLoop and JupyterJitterLoginLoop." + " How long to wait after spawning the lab before destroying" + " it again." + ), + example=60, ) execution_idle_time: int = Field( - 0, + 1, title="How long to wait between cell executions in seconds", description="Used by JupyterPythonLoop and NotebookRunner", example=1, ) reauth_interval: int = Field( - 2700, + 30 * 60, title="Time between reauthentication attempts in seconds", - description="Used by JupyterLoginLoop, JupyterPythonLoop, and" - " NotebookRunner", - example=2700, + description=( + "Used by JupyterLoginLoop, JupyterPythonLoop, and NotebookRunner." + " JupyterHub appears to issue tokens with a one hour lifetime." + ), + example=30 * 60, ) max_executions: int = Field( 25, - title="How many cells to execute in a given kernel session", - description="Only used by JupyterPythonLoop", + title="How much to execute in a given lab and session", + description=( + "For JupyterPythonLoop, this is the number of code snippets to" + " execute before restarting the lab. For NotebookRunner, it's" + " the number of notebooks." + ), example=25, ) + delete_lab: bool = Field( + True, + title="Whether to delete the lab between iterations", + description=( + "By default, the lab is deleted and recreated after each" + " iteration of monkey business involving JupyterLab. Set this" + " to False to keep the same lab." + ), + example=True, + ) + class BusinessData(BaseModel): """Status of a running business.""" diff --git a/src/mobu/monkey.py b/src/mobu/monkey.py index 609a1bfc..21abb23f 100644 --- a/src/mobu/monkey.py +++ b/src/mobu/monkey.py @@ -11,9 +11,7 @@ import structlog from aiohttp import ClientSession -from aiohttp.client_exceptions import ClientConnectorError from aiojobs import Scheduler -from aiojobs._job import Job from .config import config from .models.monkey import MonkeyData, MonkeyState @@ -21,6 +19,8 @@ if TYPE_CHECKING: from typing import Optional, Type + from aiojobs._job import Job + from .business.base import Business from .models.monkey import MonkeyConfig from .models.user import AuthenticatedUser @@ -100,47 +100,35 @@ async def start(self, scheduler: Scheduler) -> None: async def _runner(self) -> None: run = True + while run: try: self.state = MonkeyState.RUNNING await self.business.run() - self.state = MonkeyState.FINISHED - except asyncio.CancelledError: - self.state = MonkeyState.STOPPING - self.log.info("Shutting down") run = False - try: - await self.business.stop() - except ClientConnectorError: - # Ripping down async sessions can cause various parts of - # a communication in flight to fail. Just swallow it, - # since we're shutting down anyway. - pass - self.state = MonkeyState.FINISHED except Exception as e: - self.state = MonkeyState.ERROR self.log.exception( "Exception thrown while doing monkey business." ) # Just pass the exception message - the callstack will # be logged but will probably be too spammy to report. await self.alert(str(e)) - run = self.restart + run = self.restart and self.state == MonkeyState.RUNNING + if self.state == MonkeyState.RUNNING: + self.state = MonkeyState.ERROR + await self.business.stop() + if run: await asyncio.sleep(60) + self.state = MonkeyState.FINISHED + async def stop(self) -> None: if self.state == MonkeyState.FINISHED: return self.state = MonkeyState.STOPPING await self.business.stop() if self._job: - try: - await self._job.close(timeout=0) - except (asyncio.TimeoutError, asyncio.exceptions.CancelledError): - # Close will normally wait for a timeout to occur before - # throwing a timeout exception, but we'll just shut it down - # right away and eat the exception. - pass + await self._job.wait() self.state = MonkeyState.FINISHED def dump(self) -> MonkeyData: diff --git a/tests/autostart_test.py b/tests/autostart_test.py index 04828d4d..007bbf85 100644 --- a/tests/autostart_test.py +++ b/tests/autostart_test.py @@ -47,7 +47,7 @@ @pytest.fixture(autouse=True) def configure_autostart( - tmp_path: Path, jupyter: None, mock_aioresponses: aioresponses + tmp_path: Path, mock_aioresponses: aioresponses ) -> Iterator[None]: """Set up the autostart configuration.""" mock_gafaelfawr(mock_aioresponses) diff --git a/tests/business/jupyterloginloop_test.py b/tests/business/jupyterloginloop_test.py index c51e0838..e9928e33 100644 --- a/tests/business/jupyterloginloop_test.py +++ b/tests/business/jupyterloginloop_test.py @@ -9,15 +9,18 @@ import pytest from tests.support.gafaelfawr import mock_gafaelfawr +from tests.support.jupyter import JupyterState if TYPE_CHECKING: from aioresponses import aioresponses from httpx import AsyncClient + from tests.support.jupyter import MockJupyter + @pytest.mark.asyncio async def test_run( - client: AsyncClient, jupyter: None, mock_aioresponses: aioresponses + client: AsyncClient, jupyter: MockJupyter, mock_aioresponses: aioresponses ) -> None: mock_gafaelfawr(mock_aioresponses) @@ -28,7 +31,7 @@ async def test_run( "count": 1, "user_spec": {"username_prefix": "testuser", "uid_start": 1000}, "scopes": ["exec:notebook"], - "options": {"idle_time": 2}, + "options": {"settle_time": 0, "login_idle_time": 0}, "business": "JupyterLoginLoop", }, ) @@ -37,7 +40,7 @@ async def test_run( # Wait until we've finished at least one loop. Make sure nothing fails. finished = False while not finished: - await asyncio.sleep(1) + await asyncio.sleep(0.5) r = await client.get("/mobu/flocks/test/monkeys/testuser1") assert r.status_code == 200 data = r.json() @@ -52,7 +55,7 @@ async def test_run( "business": { "failure_count": 0, "name": "JupyterLoginLoop", - "success_count": ANY, + "success_count": 1, "timings": ANY, }, "restart": False, @@ -65,6 +68,9 @@ async def test_run( }, } + # Check that the lab is shut down properly between iterations. + assert jupyter.state["testuser1"] == JupyterState.LOGGED_IN + r = await client.get("/mobu/flocks/test/monkeys/testuser1/log") assert r.status_code == 200 assert "Starting up" in r.text @@ -72,3 +78,39 @@ async def test_run( r = await client.delete("/mobu/flocks/test") assert r.status_code == 204 + + +@pytest.mark.asyncio +async def test_reuse_lab( + client: AsyncClient, jupyter: MockJupyter, mock_aioresponses: aioresponses +) -> None: + mock_gafaelfawr(mock_aioresponses) + + r = await client.put( + "/mobu/flocks", + json={ + "name": "test", + "count": 1, + "user_spec": {"username_prefix": "testuser", "uid_start": 1000}, + "scopes": ["exec:notebook"], + "options": { + "settle_time": 0, + "login_idle_time": 0, + "delete_lab": False, + }, + "business": "JupyterLoginLoop", + }, + ) + assert r.status_code == 201 + + # Wait until we've finished at least one loop. Make sure nothing fails. + finished = False + while not finished: + await asyncio.sleep(0.5) + r = await client.get("/mobu/flocks/test/monkeys/testuser1") + assert r.status_code == 200 + if r.json()["business"]["success_count"] > 0: + finished = True + + # Check that the lab is still running between iterations. + assert jupyter.state["testuser1"] == JupyterState.LAB_RUNNING diff --git a/tests/business/jupyterpythonloop_test.py b/tests/business/jupyterpythonloop_test.py index 90c5bf7b..dc704fe4 100644 --- a/tests/business/jupyterpythonloop_test.py +++ b/tests/business/jupyterpythonloop_test.py @@ -17,7 +17,7 @@ @pytest.mark.asyncio async def test_run( - client: AsyncClient, jupyter: None, mock_aioresponses: aioresponses + client: AsyncClient, mock_aioresponses: aioresponses ) -> None: mock_gafaelfawr(mock_aioresponses) @@ -28,12 +28,23 @@ async def test_run( "count": 1, "user_spec": {"username_prefix": "testuser", "uid_start": 1000}, "scopes": ["exec:notebook"], - "options": {"max_executions": 3}, + "options": {"settle_time": 0, "max_executions": 3}, "business": "JupyterPythonLoop", }, ) assert r.status_code == 201 + # Wait until we've finished at least one loop. Make sure nothing fails. + finished = False + while not finished: + await asyncio.sleep(1) + r = await client.get("/mobu/flocks/test/monkeys/testuser1") + assert r.status_code == 200 + data = r.json() + assert data["business"]["failure_count"] == 0 + if data["business"]["success_count"] > 0: + finished = True + r = await client.get("/mobu/flocks/test/monkeys/testuser1") assert r.status_code == 200 assert r.json() == { @@ -41,11 +52,11 @@ async def test_run( "business": { "failure_count": 0, "name": "JupyterPythonLoop", - "success_count": ANY, + "success_count": 1, "timings": ANY, }, "restart": False, - "state": ANY, + "state": "RUNNING", "user": { "scopes": ["exec:notebook"], "token": ANY, @@ -54,17 +65,6 @@ async def test_run( }, } - # Wait until we've finished at least one loop. Make sure nothing fails. - finished = False - while not finished: - await asyncio.sleep(1) - r = await client.get("/mobu/flocks/test/monkeys/testuser1") - assert r.status_code == 200 - data = r.json() - assert data["business"]["failure_count"] == 0 - if data["business"]["success_count"] > 0: - finished = True - # Get the client log and check no exceptions were thrown. r = await client.get("/mobu/flocks/test/monkeys/testuser1/log") assert r.status_code == 200 @@ -72,3 +72,29 @@ async def test_run( r = await client.delete("/mobu/flocks/test") assert r.status_code == 204 + + +@pytest.mark.asyncio +async def test_server_shutdown( + client: AsyncClient, mock_aioresponses: aioresponses +) -> None: + mock_gafaelfawr(mock_aioresponses) + + r = await client.put( + "/mobu/flocks", + json={ + "name": "test", + "count": 20, + "user_spec": {"username_prefix": "testuser", "uid_start": 1000}, + "scopes": ["exec:notebook"], + "options": {"settle_time": 0, "max_executions": 3}, + "business": "JupyterPythonLoop", + }, + ) + assert r.status_code == 201 + + # Wait for a second so that all the monkeys get started. + await asyncio.sleep(1) + + # Now end the test without shutting anything down explicitly. This tests + # that server shutdown correctly stops everything and cleans up resources. diff --git a/tests/conftest.py b/tests/conftest.py index 33bf9043..a31905ce 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,7 +14,7 @@ from mobu.config import config from mobu.jupyterclient import JupyterClient from tests.support.gafaelfawr import make_gafaelfawr_token -from tests.support.jupyter import mock_jupyter +from tests.support.jupyter import MockJupyter, mock_jupyter if TYPE_CHECKING: from typing import AsyncIterator, Iterator @@ -33,7 +33,7 @@ def configure() -> Iterator[None]: minimal test configuration and a unique admin token that is replaced after the test runs. """ - config.environment_url = "https://test.example.com/" + config.environment_url = "https://test.example.com" config.gafaelfawr_token = make_gafaelfawr_token() yield config.environment_url = "" @@ -41,11 +41,17 @@ def configure() -> Iterator[None]: @pytest.fixture -async def app() -> AsyncIterator[FastAPI]: +async def app(jupyter: MockJupyter) -> AsyncIterator[FastAPI]: """Return a configured test application. Wraps the application in a lifespan manager so that startup and shutdown events are sent during test execution. + + Notes + ----- + This must depend on the Jupyter mock since otherwise the JupyterClient + mocking is undone before the app is shut down, which causes it to try to + make real web socket calls. """ async with LifespanManager(main.app): yield main.app @@ -59,9 +65,9 @@ async def client(app: FastAPI) -> AsyncIterator[AsyncClient]: @pytest.fixture -def jupyter(mock_aioresponses: aioresponses) -> Iterator[None]: +def jupyter(mock_aioresponses: aioresponses) -> Iterator[MockJupyter]: """Mock out JupyterHub/Lab.""" - mock_jupyter(mock_aioresponses) + jupyter_mock = mock_jupyter(mock_aioresponses) # aioresponses has no mechanism to mock ws_connect, so we can't properly # test JupyterClient.run_python. For now, just mock it out entirely. @@ -71,7 +77,7 @@ def jupyter(mock_aioresponses: aioresponses) -> Iterator[None]: # reusing the websocket. with patch.object(JupyterClient, "_websocket_connect") as mock2: mock2.return_value = AsyncMock() - yield + yield jupyter_mock @pytest.fixture diff --git a/tests/handlers/flock_test.py b/tests/handlers/flock_test.py index 4878fc7e..c6ac7866 100644 --- a/tests/handlers/flock_test.py +++ b/tests/handlers/flock_test.py @@ -18,7 +18,7 @@ @pytest.mark.asyncio async def test_start_stop( - client: AsyncClient, jupyter: None, mock_aioresponses: aioresponses + client: AsyncClient, mock_aioresponses: aioresponses ) -> None: mock_gafaelfawr(mock_aioresponses) @@ -46,7 +46,7 @@ async def test_start_stop( "business": { "failure_count": 0, "name": "Business", - "success_count": 0, + "success_count": ANY, "timings": ANY, }, "restart": False, @@ -116,7 +116,7 @@ async def test_start_stop( @pytest.mark.asyncio async def test_user_list( - client: AsyncClient, jupyter: None, mock_aioresponses: aioresponses + client: AsyncClient, mock_aioresponses: aioresponses ) -> None: mock_gafaelfawr(mock_aioresponses) @@ -147,7 +147,7 @@ async def test_user_list( "business": { "failure_count": 0, "name": "Business", - "success_count": 0, + "success_count": ANY, "timings": ANY, }, "restart": False, @@ -164,7 +164,7 @@ async def test_user_list( "business": { "failure_count": 0, "name": "Business", - "success_count": 0, + "success_count": ANY, "timings": ANY, }, "restart": False, @@ -189,13 +189,13 @@ async def test_user_list( assert r.status_code == 200 assert r.json() == expected["monkeys"][1] - # Intentionally do not delete the flock to check whether aiojobs will - # shut down properly when the server is shut down. + # Intentionally do not delete the flock to check whether we shut + # everything down properly when the server is shut down. @pytest.mark.asyncio async def test_errors( - client: AsyncClient, jupyter: None, mock_aioresponses: aioresponses + client: AsyncClient, mock_aioresponses: aioresponses ) -> None: mock_gafaelfawr(mock_aioresponses) diff --git a/tests/monkeyflocker_test.py b/tests/monkeyflocker_test.py index 5d3073d1..382d9320 100644 --- a/tests/monkeyflocker_test.py +++ b/tests/monkeyflocker_test.py @@ -144,7 +144,7 @@ def test_start_report_stop(tmp_path: Path, app_url: str) -> None: "business": { "failure_count": 0, "name": "Business", - "success_count": 0, + "success_count": ANY, "timings": ANY, }, "restart": False, diff --git a/tests/support/jupyter.py b/tests/support/jupyter.py index e29df981..93408762 100644 --- a/tests/support/jupyter.py +++ b/tests/support/jupyter.py @@ -49,7 +49,9 @@ def __init__(self) -> None: def login(self, url: str, **kwargs: Any) -> CallbackResult: user = self._get_user(kwargs["headers"]["Authorization"]) - self.state[user] = JupyterState.LOGGED_IN + state = self.state.get(user, JupyterState.LOGGED_OUT) + if state == JupyterState.LOGGED_OUT: + self.state[user] = JupyterState.LOGGED_IN return CallbackResult(status=200) def hub(self, url: str, **kwargs: Any) -> CallbackResult: @@ -95,7 +97,9 @@ def lab(self, url: str, **kwargs: Any) -> CallbackResult: def delete_lab(self, url: str, **kwargs: Any) -> CallbackResult: user = self._get_user(kwargs["headers"]["Authorization"]) assert str(url).endswith(f"/users/{user}/server") - self.state[user] = JupyterState.LOGGED_OUT + state = self.state.get(user, JupyterState.LOGGED_OUT) + assert state != JupyterState.LOGGED_OUT + self.state[user] = JupyterState.LOGGED_IN return CallbackResult(status=202) def create_session(self, url: str, **kwargs: Any) -> CallbackResult: @@ -139,7 +143,7 @@ def _get_user(authorization: str) -> str: return user.decode() -def mock_jupyter(mocked: aioresponses) -> None: +def mock_jupyter(mocked: aioresponses) -> MockJupyter: """Set up a mock JupyterHub/Lab that always returns success. Currently only handles a lab spawn and then shutdown. Behavior will @@ -173,3 +177,4 @@ def mock_jupyter(mocked: aioresponses) -> None: callback=mock.delete_session, repeat=True, ) + return mock From 32196ebc2294d53d240796bddf222430e270c7eb Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Wed, 28 Jul 2021 11:15:42 -0700 Subject: [PATCH 02/14] Reauthenticate to lab before deleting a session One hypothesis of the cause of the 403 errors is that the lab OAuth 2 authentication has expired and the lab doesn't redirect for authentication on DELETE or until the token has fully expired. Test this by attempting a lab login immediately before doing the delete. --- src/mobu/jupyterclient.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mobu/jupyterclient.py b/src/mobu/jupyterclient.py index 4908620a..e0b3438e 100644 --- a/src/mobu/jupyterclient.py +++ b/src/mobu/jupyterclient.py @@ -268,6 +268,7 @@ async def _websocket_connect( return await self.session.ws_connect(channels_url) async def delete_labsession(self, session: JupyterLabSession) -> None: + await self.lab_login() session_url = ( self.jupyter_url + f"user/{self.user.username}/api/sessions/{session.session_id}" From 5c4274517af51149b517b9fc658d46dff9f5d1ad Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Wed, 28 Jul 2021 11:28:35 -0700 Subject: [PATCH 03/14] Poll the Hub URL instead of pending spawn URL After spawning a lab, the web browser retrieves a progress URL. It doesn't keep trying to reload the spawn-pending URL, and we're seeing occasional 503 errors polling that URL. My theory is that the Hub decommissions the spawn-pending URL after the lab is spawned and we're requesting it after that has happened. For now, rather than trying to understand the results of the progress URL, poll the hub URL instead. Once there is a spawned pod, it will redirect to the lab URL. See if this is more reliable. --- src/mobu/jupyterclient.py | 6 ++---- tests/support/jupyter.py | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/mobu/jupyterclient.py b/src/mobu/jupyterclient.py index e0b3438e..9a55c01e 100644 --- a/src/mobu/jupyterclient.py +++ b/src/mobu/jupyterclient.py @@ -174,9 +174,7 @@ async def is_lab_running(self) -> bool: async def spawn_lab(self) -> None: spawn_url = self.jupyter_url + "hub/spawn" - pending_url = ( - self.jupyter_url + f"hub/spawn-pending/{self.user.username}" - ) + hub_url = self.jupyter_url + "hub" lab_url = self.jupyter_url + f"user/{self.user.username}/lab" # DM-23864: Do a get on the spawn URL even if I don't have to. @@ -202,7 +200,7 @@ async def spawn_lab(self) -> None: retries = max_poll_secs / poll_interval while retries > 0: - async with self.session.get(pending_url) as r: + async with self.session.get(hub_url) as r: if str(r.url) == lab_url: self.log.info(f"Lab spawned, redirected to {r.url}") return diff --git a/tests/support/jupyter.py b/tests/support/jupyter.py index 93408762..5d159fe5 100644 --- a/tests/support/jupyter.py +++ b/tests/support/jupyter.py @@ -64,7 +64,7 @@ def hub(self, url: str, **kwargs: Any) -> CallbackResult: elif state == JupyterState.SPAWN_PENDING: redirect_to = _url(f"hub/spawn-pending/{user}") elif state == JupyterState.LAB_RUNNING: - redirect_to = _url(f"hub/spawn-pending/{user}") + redirect_to = _url(f"user/{user}/lab") return CallbackResult(status=307, headers={"Location": redirect_to}) def spawn(self, url: str, **kwargs: Any) -> CallbackResult: From e11a149d2324fef542fb0270e51e8f77c8bbd618 Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Wed, 28 Jul 2021 11:40:25 -0700 Subject: [PATCH 04/14] Remove duplicate lab deletion from NotebookRunner This is now handled by JupyterLoginLoop. --- src/mobu/business/notebookrunner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/mobu/business/notebookrunner.py b/src/mobu/business/notebookrunner.py index 20bd37e5..272fd230 100644 --- a/src/mobu/business/notebookrunner.py +++ b/src/mobu/business/notebookrunner.py @@ -52,7 +52,6 @@ async def startup(self) -> None: self._notebook_iterator = os.scandir(self._repo_dir.name) self.logger.info("Repository cloned and ready") await super().startup() - await self.initial_delete_lab() def clone_repo(self) -> None: url = self.config.repo_url From 08ef6d96c63a26501346f3df4e134269cb973fc0 Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Wed, 28 Jul 2021 11:53:54 -0700 Subject: [PATCH 05/14] Do not stop a business on error An error already aborts anything the business is doing, so there's nothing listening to the queue and it's meaningless to attempt to stop it again. This was intended to do additional cleanup, but that cleanup needs to happen at the start of a new run. --- src/mobu/monkey.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/mobu/monkey.py b/src/mobu/monkey.py index 21abb23f..452d0ac7 100644 --- a/src/mobu/monkey.py +++ b/src/mobu/monkey.py @@ -116,7 +116,6 @@ async def _runner(self) -> None: run = self.restart and self.state == MonkeyState.RUNNING if self.state == MonkeyState.RUNNING: self.state = MonkeyState.ERROR - await self.business.stop() if run: await asyncio.sleep(60) From 27ea33068d75a7d6157c5806213270defdfba274 Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Wed, 28 Jul 2021 12:06:50 -0700 Subject: [PATCH 06/14] Fix default JupyterPythonLoop code The end="" part of the code was lost in other changes, but it makes the logs cleaner and easier to read. --- src/mobu/models/business.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mobu/models/business.py b/src/mobu/models/business.py index 969bae30..398c25f4 100644 --- a/src/mobu/models/business.py +++ b/src/mobu/models/business.py @@ -30,10 +30,10 @@ class BusinessConfig(BaseModel): ) code: str = Field( - "print(2+2)", + 'print(2+2, end="")', title="Python code to execute", description="Only used by JupyterPythonLoop", - example="print(2+2)", + example='print(2+2, end="")', ) repo_url: str = Field( From b8ef70961ad812c5302e19984b7d394915ddd241 Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Wed, 28 Jul 2021 12:20:26 -0700 Subject: [PATCH 07/14] Fix desynchronization in web socket We return immediately when we get the first stream reply from an execute request on the web socket. Now that we're reusing the web socket, this results in only the first execution result being returned. That's because we leave the execute_reply message in the web socket and don't check its message ID. The execution sequence is therefore: - Send the first reqeust - Retrieve a stream response and return it - Send the second request - Retrieve the execute_reply response from the first and return the empty string - Send the third request - Retrieve the stream response from the second request and ignore it because the message ID is wrong - Retrieve the execute_reply from the second response and return the empty string This continues until the session is closed. Fix this by, one, always checking the message ID of the response and complaining about unexpected responses, and two, accumulating all stream responses in a result buffer and not returning them until we see the execute_reply. --- src/mobu/jupyterclient.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/mobu/jupyterclient.py b/src/mobu/jupyterclient.py index 9a55c01e..b5e7941f 100644 --- a/src/mobu/jupyterclient.py +++ b/src/mobu/jupyterclient.py @@ -310,25 +310,25 @@ async def run_python(self, session: JupyterLabSession, code: str) -> str: await session.websocket.send_json(msg) + result = "" while True: r = await session.websocket.receive_json() self.log.debug(f"Recieved kernel message: {r}") msg_type = r["msg_type"] + if msg_id != r["parent_header"]["msg_id"]: + self.log.warning(f"Unexpected kernel message: {r}", r) + continue if msg_type == "error": error_message = "".join(r["content"]["traceback"]) raise NotebookException(self._ansi_escape(error_message)) - elif ( - msg_type == "stream" and msg_id == r["parent_header"]["msg_id"] - ): - return r["content"]["text"] + elif msg_type == "stream": + result += r["content"]["text"] elif msg_type == "execute_reply": status = r["content"]["status"] if status == "ok": - return "" + return result else: - raise NotebookException( - f"Error content status is {status}" - ) + raise NotebookException(f"Result status is {status}") async def _raise_error(self, msg: str, r: ClientResponse) -> None: raise Exception(f"{msg}: {r.status} {r.url}: {r.headers}") From d6de507905403a0ddede1c7b763b8ef2bb00ce79 Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Wed, 28 Jul 2021 12:38:50 -0700 Subject: [PATCH 08/14] Ignore web socket messages not intended for us The web socket is rather chatty with broadcast status messages, so ignore messages that aren't replies to our execute_request. --- src/mobu/jupyterclient.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/mobu/jupyterclient.py b/src/mobu/jupyterclient.py index b5e7941f..1c1e9528 100644 --- a/src/mobu/jupyterclient.py +++ b/src/mobu/jupyterclient.py @@ -315,8 +315,9 @@ async def run_python(self, session: JupyterLabSession, code: str) -> str: r = await session.websocket.receive_json() self.log.debug(f"Recieved kernel message: {r}") msg_type = r["msg_type"] - if msg_id != r["parent_header"]["msg_id"]: - self.log.warning(f"Unexpected kernel message: {r}", r) + if r["parent_header"]["msg_id"] != msg_id: + # Ignore messages not intended for us. The web socket is + # rather chatty with broadcast status messages. continue if msg_type == "error": error_message = "".join(r["content"]["traceback"]) From 94e3b4ef5e919d69205f0a95331a3af2684e370b Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Wed, 28 Jul 2021 12:44:50 -0700 Subject: [PATCH 09/14] Iterate through notebooks properly Instead of running a batch of max_executions notebooks, one after the other, we were executing each individual notebook max_executions times and then moving to the next. Fix the way notebook iteration is done to restore the intended behavior. --- src/mobu/business/notebookrunner.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/mobu/business/notebookrunner.py b/src/mobu/business/notebookrunner.py index 272fd230..1ec2de3e 100644 --- a/src/mobu/business/notebookrunner.py +++ b/src/mobu/business/notebookrunner.py @@ -61,14 +61,16 @@ def clone_repo(self) -> None: self._repo = git.Repo.clone_from(url, path, branch=branch) async def execute_code(self, session: JupyterLabSession) -> None: - self._next_notebook() - assert self.notebook - self.logger.info(f"Starting notebook: {self.notebook.name}") - cells = self.read_notebook(self.notebook.name, self.notebook.path) for count in range(self.config.max_executions): + self._next_notebook() + assert self.notebook + self.logger.info(f"Starting notebook: {self.notebook.name}") + cells = self.read_notebook(self.notebook.name, self.notebook.path) + iteration = f"{count + 1}/{self.config.max_executions}" msg = f"Notebook '{self.notebook.name}' iteration {iteration}" self.logger.info(msg) + await self.reauth_if_needed() for cell in cells: From 5460022d19fe469cf7ad289565680e2f4a3f03b4 Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Wed, 28 Jul 2021 12:49:51 -0700 Subject: [PATCH 10/14] Remove the periodic reauth code This should no longer be needed, since we attempt to log on to the lab before doing API operations and that should redirect for authentication if required. --- src/mobu/business/jupyterloginloop.py | 12 +----------- src/mobu/business/notebookrunner.py | 2 -- 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/src/mobu/business/jupyterloginloop.py b/src/mobu/business/jupyterloginloop.py index 082eaeba..7415f7a4 100644 --- a/src/mobu/business/jupyterloginloop.py +++ b/src/mobu/business/jupyterloginloop.py @@ -6,7 +6,6 @@ from __future__ import annotations -from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING from ..jupyterclient import JupyterClient @@ -46,7 +45,6 @@ def __init__( ) -> None: super().__init__(logger, business_config, user) self._client = JupyterClient(user, logger, business_config) - self._last_login = datetime.fromtimestamp(0, tz=timezone.utc) async def startup(self) -> None: await self.hub_login() @@ -54,13 +52,13 @@ async def startup(self) -> None: async def execute(self) -> None: """The work done in each iteration of the loop.""" - await self.reauth_if_needed() await self.ensure_lab() await self.lab_settle() if self.stopping: return await self.lab_business() if self.config.delete_lab: + await self.hub_login() await self.delete_lab() async def shutdown(self) -> None: @@ -71,7 +69,6 @@ async def hub_login(self) -> None: self.logger.info("Logging in to hub") with self.timings.start("hub_login"): await self._client.hub_login() - self._last_login = datetime.now(tz=timezone.utc) async def ensure_lab(self) -> None: with self.timings.start("ensure_lab"): @@ -102,10 +99,3 @@ async def lab_business(self) -> None: """ with self.timings.start("lab_wait"): await self.pause(self.config.login_idle_time) - - async def reauth_if_needed(self) -> None: - elapsed = datetime.now(tz=timezone.utc) - self._last_login - if elapsed > timedelta(self.config.reauth_interval): - self.logger.info("Reauthenticating to Hub") - with self.timings.start("hub_reauth"): - await self._client.hub_login() diff --git a/src/mobu/business/notebookrunner.py b/src/mobu/business/notebookrunner.py index 1ec2de3e..d4a7024c 100644 --- a/src/mobu/business/notebookrunner.py +++ b/src/mobu/business/notebookrunner.py @@ -71,8 +71,6 @@ async def execute_code(self, session: JupyterLabSession) -> None: msg = f"Notebook '{self.notebook.name}' iteration {iteration}" self.logger.info(msg) - await self.reauth_if_needed() - for cell in cells: self.running_code = "".join(cell["source"]) await self.execute_cell(session, self.running_code) From 64d5c1d8f5423f20a22087cf829eed9034aea5ec Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Wed, 28 Jul 2021 16:45:54 -0700 Subject: [PATCH 11/14] Wait for the lab to shut down, fix shutdown Check whether the lab is still running after shutdown and wait up to ten seconds for the lab to shut down completely. This will help avoid occasional spurious issues when we delete the lab and then start using it before it shuts down. This change uncovered various shutdown concurrency isseus. Fix the state handling inside the monkey so that the wait after an error when respawning the business doesn't block shutdown. Ensure that we acknowledge the business shutdown command even if we error in startup after it was sent. Shut down all flocks in parallel during global shutdown. --- src/mobu/business/base.py | 53 ++++++++++++++----------- src/mobu/business/jupyterloginloop.py | 4 +- src/mobu/dependencies/manager.py | 8 ++-- src/mobu/jupyterclient.py | 18 ++++++++- src/mobu/monkey.py | 26 ++++++++---- tests/business/jupyterloginloop_test.py | 30 ++++++++++++++ tests/conftest.py | 5 ++- tests/support/jupyter.py | 22 +++++++--- 8 files changed, 123 insertions(+), 43 deletions(-) diff --git a/src/mobu/business/base.py b/src/mobu/business/base.py index cd3bc177..098976cc 100644 --- a/src/mobu/business/base.py +++ b/src/mobu/business/base.py @@ -36,7 +36,8 @@ class Business: Subclasses should override ``startup``, ``execute``, and ``shutdown`` to add appropriate behavior. ``idle`` by default waits for ``idle_time``, - which generally does not need to be overridden. + which generally does not need to be overridden. Subclasses should also + override ``close`` to shut down any object state created in ``__init__``. All delays should be done by calling ``pause``, and the caller should check ``self.stopping`` and exit any loops if it is `True` after calling @@ -67,26 +68,33 @@ def __init__( self.control: Queue[BusinessCommand] = Queue() self.stopping = False + async def close(self) -> None: + """Clean up any business state on shutdown.""" + pass + async def run(self) -> None: """The core business logic, run in a background task.""" self.logger.info("Starting up...") - await self.startup() - - while not self.stopping: - self.logger.info("Starting next iteration") - try: - await self.execute() - self.success_count += 1 - except Exception: - self.failure_count += 1 - raise - await self.idle() - - self.logger.info("Shutting down...") - await self.shutdown() - - # Tell the control channel we've processed the stop command. - self.control.task_done() + try: + await self.startup() + + while not self.stopping: + self.logger.info("Starting next iteration") + try: + await self.execute() + self.success_count += 1 + except Exception: + self.failure_count += 1 + raise + await self.idle() + + self.logger.info("Shutting down...") + await self.shutdown() + await self.close() + finally: + # Tell the control channel we've processed the stop command. + if self.stopping: + self.control.task_done() async def startup(self) -> None: """Run before the start of the first iteration and then not again.""" @@ -108,9 +116,11 @@ async def shutdown(self) -> None: async def stop(self) -> None: """Tell the running background task to stop and wait for that.""" + self.stopping = True await self.control.put(BusinessCommand.STOP) await self.control.join() self.logger.info("Stopped") + await self.close() async def pause(self, seconds: float) -> None: """Pause for up to the number of seconds, handling commands.""" @@ -118,14 +128,11 @@ async def pause(self, seconds: float) -> None: return try: if seconds: - command = await asyncio.wait_for(self.control.get(), seconds) + await asyncio.wait_for(self.control.get(), seconds) else: - command = self.control.get_nowait() + self.control.get_nowait() except (TimeoutError, QueueEmpty): return - else: - if command == BusinessCommand.STOP: - self.stopping = True def dump(self) -> BusinessData: return BusinessData( diff --git a/src/mobu/business/jupyterloginloop.py b/src/mobu/business/jupyterloginloop.py index 7415f7a4..981797e2 100644 --- a/src/mobu/business/jupyterloginloop.py +++ b/src/mobu/business/jupyterloginloop.py @@ -46,6 +46,9 @@ def __init__( super().__init__(logger, business_config, user) self._client = JupyterClient(user, logger, business_config) + async def close(self) -> None: + await self._client.close() + async def startup(self) -> None: await self.hub_login() await self.initial_delete_lab() @@ -63,7 +66,6 @@ async def execute(self) -> None: async def shutdown(self) -> None: await self.delete_lab() - await self._client.close() async def hub_login(self) -> None: self.logger.info("Logging in to hub") diff --git a/src/mobu/dependencies/manager.py b/src/mobu/dependencies/manager.py index 5f2a82f9..f878632e 100644 --- a/src/mobu/dependencies/manager.py +++ b/src/mobu/dependencies/manager.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio from typing import Dict, List, Optional from aiohttp import ClientSession @@ -30,9 +31,8 @@ async def init(self) -> None: self._session = ClientSession() async def cleanup(self) -> None: - for flock in self._flocks.values(): - await flock.stop() - self._flocks.clear() + awaits = [self.stop_flock(f) for f in self._flocks] + await asyncio.gather(*awaits) if self._scheduler is not None: await self._scheduler.close() self._scheduler = None @@ -63,8 +63,8 @@ async def stop_flock(self, name: str) -> None: flock = self._flocks.get(name) if flock is None: raise FlockNotFoundException(name) - await flock.stop() del self._flocks[name] + await flock.stop() monkey_business_manager = MonkeyBusinessManager() diff --git a/src/mobu/jupyterclient.py b/src/mobu/jupyterclient.py index 1c1e9528..5f9a6a70 100644 --- a/src/mobu/jupyterclient.py +++ b/src/mobu/jupyterclient.py @@ -164,10 +164,14 @@ async def is_lab_running(self) -> bool: async with self.session.get(hub_url) as r: if r.status != 200: self.log.error(f"Error {r.status} from {r.url}") + return False spawn_url = self.jupyter_url + "hub/spawn" + spawn_pending_url = ( + self.jupyter_url + f"hub/spawn-pending/{self.user.username}" + ) self.log.info(f"Going to {hub_url} redirected to {r.url}") - if str(r.url) == spawn_url: + if str(r.url) in [spawn_url, spawn_pending_url]: return False return True @@ -226,6 +230,18 @@ async def delete_lab(self) -> None: if r.status not in [200, 202, 204]: await self._raise_error("Error deleting lab", r) + # Wait for the lab to actually go away. If we don't do this, we may + # try to create a new lab while the old one is still shutting down. + count = 0 + while await self.is_lab_running() and count < 10: + self.log.info(f"Waiting for lab deletion ({count}s elapsed)") + await asyncio.sleep(1) + count += 1 + if await self.is_lab_running(): + self.log.warning("Giving up on waiting for lab deletion") + else: + self.log.info("Lab deleted") + async def create_labsession( self, kernel_name: str = "LSST", notebook_name: Optional[str] = None ) -> JupyterLabSession: diff --git a/src/mobu/monkey.py b/src/mobu/monkey.py index 452d0ac7..174fec6a 100644 --- a/src/mobu/monkey.py +++ b/src/mobu/monkey.py @@ -40,10 +40,12 @@ def __init__( user: AuthenticatedUser, session: ClientSession, ): + self.config = monkey_config self.name = monkey_config.name self.state = MonkeyState.IDLE self.user = user self.restart = monkey_config.restart + self.business_type = business_type self._session = session self._logfile = NamedTemporaryFile() @@ -64,9 +66,7 @@ def __init__( logger.info(f"Starting new file logger {self._logfile.name}") self.log = structlog.wrap_logger(logger) - self.business = business_type( - self.log, monkey_config.options, self.user - ) + self.business = business_type(self.log, self.config.options, self.user) async def alert(self, msg: str) -> None: if self.state in (MonkeyState.STOPPING, MonkeyState.FINISHED): @@ -113,21 +113,33 @@ async def _runner(self) -> None: # Just pass the exception message - the callstack will # be logged but will probably be too spammy to report. await self.alert(str(e)) + await self.business.close() run = self.restart and self.state == MonkeyState.RUNNING if self.state == MonkeyState.RUNNING: self.state = MonkeyState.ERROR if run: await asyncio.sleep(60) + # Recreate the business since we will have closed global + # resources when it aborted with an error. + self.business = self.business_type( + self.log, self.config.options, self.user + ) + self.state = MonkeyState.FINISHED async def stop(self) -> None: if self.state == MonkeyState.FINISHED: return - self.state = MonkeyState.STOPPING - await self.business.stop() - if self._job: - await self._job.wait() + elif self.state == MonkeyState.RUNNING: + self.state = MonkeyState.STOPPING + await self.business.stop() + if self._job: + await self._job.wait() + elif self.state == MonkeyState.ERROR: + await self.business.close() + if self._job: + await self._job.close() self.state = MonkeyState.FINISHED def dump(self) -> MonkeyData: diff --git a/tests/business/jupyterloginloop_test.py b/tests/business/jupyterloginloop_test.py index e9928e33..86a4a0ac 100644 --- a/tests/business/jupyterloginloop_test.py +++ b/tests/business/jupyterloginloop_test.py @@ -114,3 +114,33 @@ async def test_reuse_lab( # Check that the lab is still running between iterations. assert jupyter.state["testuser1"] == JupyterState.LAB_RUNNING + + +@pytest.mark.asyncio +async def test_delayed_lab_delete( + client: AsyncClient, jupyter: MockJupyter, mock_aioresponses: aioresponses +) -> None: + mock_gafaelfawr(mock_aioresponses) + + r = await client.put( + "/mobu/flocks", + json={ + "name": "test", + "count": 5, + "user_spec": {"username_prefix": "testuser", "uid_start": 1000}, + "scopes": ["exec:notebook"], + "options": { + "settle_time": 0, + "login_idle_time": 0, + "delete_lab": False, + }, + "business": "JupyterLoginLoop", + }, + ) + assert r.status_code == 201 + + # End the test without shutting anything down. The test asgi-lifespan + # wrapper has a shutdown timeout of ten seconds and delete will take + # five seconds, so the test is that everything shuts down cleanly without + # throwing exceptions. + jupyter.delete_immediate = False diff --git a/tests/conftest.py b/tests/conftest.py index a31905ce..432a03a0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -52,8 +52,11 @@ async def app(jupyter: MockJupyter) -> AsyncIterator[FastAPI]: This must depend on the Jupyter mock since otherwise the JupyterClient mocking is undone before the app is shut down, which causes it to try to make real web socket calls. + + A tests in business/jupyterloginloop_test.py depends on the exact shutdown + timeout. """ - async with LifespanManager(main.app): + async with LifespanManager(main.app, shutdown_timeout=10): yield main.app diff --git a/tests/support/jupyter.py b/tests/support/jupyter.py index 5d159fe5..1fc2d560 100644 --- a/tests/support/jupyter.py +++ b/tests/support/jupyter.py @@ -4,6 +4,7 @@ import re from base64 import urlsafe_b64decode +from datetime import datetime, timedelta, timezone from enum import Enum from typing import TYPE_CHECKING from uuid import uuid4 @@ -15,7 +16,7 @@ if TYPE_CHECKING: from re import Pattern - from typing import Any, Dict, Union + from typing import Any, Dict, Optional, Union from aioresponses import aioresponses @@ -46,6 +47,8 @@ class MockJupyter: def __init__(self) -> None: self.sessions: Dict[str, JupyterLabSession] = {} self.state: Dict[str, JupyterState] = {} + self.delete_immediate = True + self._delete_at: Dict[str, Optional[datetime]] = {} def login(self, url: str, **kwargs: Any) -> CallbackResult: user = self._get_user(kwargs["headers"]["Authorization"]) @@ -65,6 +68,11 @@ def hub(self, url: str, **kwargs: Any) -> CallbackResult: redirect_to = _url(f"hub/spawn-pending/{user}") elif state == JupyterState.LAB_RUNNING: redirect_to = _url(f"user/{user}/lab") + delete_at = self._delete_at.get(user) + if delete_at and datetime.now(tz=timezone.utc) > delete_at: + del self._delete_at[user] + self.state[user] = JupyterState.LOGGED_IN + redirect_to = _url("hub/spawn") return CallbackResult(status=307, headers={"Location": redirect_to}) def spawn(self, url: str, **kwargs: Any) -> CallbackResult: @@ -99,7 +107,11 @@ def delete_lab(self, url: str, **kwargs: Any) -> CallbackResult: assert str(url).endswith(f"/users/{user}/server") state = self.state.get(user, JupyterState.LOGGED_OUT) assert state != JupyterState.LOGGED_OUT - self.state[user] = JupyterState.LOGGED_IN + if self.delete_immediate: + self.state[user] = JupyterState.LOGGED_IN + else: + now = datetime.now(tz=timezone.utc) + self._delete_at[user] = now + timedelta(seconds=5) return CallbackResult(status=202) def create_session(self, url: str, **kwargs: Any) -> CallbackResult: @@ -125,10 +137,8 @@ def create_session(self, url: str, **kwargs: Any) -> CallbackResult: def delete_session(self, url: str, **kwargs: Any) -> CallbackResult: user = self._get_user(kwargs["headers"]["Authorization"]) - session = self.sessions[user] - assert str(url).endswith( - f"/user/{user}/api/sessions/{session.session_id}" - ) + session_id = self.sessions[user].session_id + assert str(url).endswith(f"/user/{user}/api/sessions/{session_id}") state = self.state.get(user, JupyterState.LOGGED_OUT) assert state == JupyterState.LAB_RUNNING del self.sessions[user] From 826e8c1e29a17a949d549ac7a80ecab6397ad568 Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Wed, 28 Jul 2021 17:04:13 -0700 Subject: [PATCH 12/14] Improve is_lab_running Look explicitly for the three URLs that we expect and alert if we see a different URL. Do not attempt to delete the lab if it is not running. --- src/mobu/jupyterclient.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/mobu/jupyterclient.py b/src/mobu/jupyterclient.py index 5f9a6a70..6caa2c6c 100644 --- a/src/mobu/jupyterclient.py +++ b/src/mobu/jupyterclient.py @@ -159,22 +159,30 @@ async def lab_login(self) -> None: await self._raise_error("Error logging into lab", r) async def is_lab_running(self) -> bool: - self.log.info("Is lab running?") hub_url = self.jupyter_url + "hub" + spawn_url = self.jupyter_url + "hub/spawn" + spawn_pending_url = ( + self.jupyter_url + f"hub/spawn-pending/{self.user.username}" + ) + lab_url = self.jupyter_url + f"user/{self.user.username}/lab" + async with self.session.get(hub_url) as r: if r.status != 200: self.log.error(f"Error {r.status} from {r.url}") return False - spawn_url = self.jupyter_url + "hub/spawn" - spawn_pending_url = ( - self.jupyter_url + f"hub/spawn-pending/{self.user.username}" - ) - self.log.info(f"Going to {hub_url} redirected to {r.url}") if str(r.url) in [spawn_url, spawn_pending_url]: + self.log.info("Lab is not currently running") + return False + elif str(r.url) == lab_url: + self.log.info("Lab is currently running") + return True + else: + self.log.warning( + f"Going to {hub_url} redirected to unexpected URL {r.url}" + ) + self.log.info("Assuming lab is not running") return False - - return True async def spawn_lab(self) -> None: spawn_url = self.jupyter_url + "hub/spawn" @@ -219,13 +227,14 @@ async def spawn_lab(self) -> None: raise Exception("Giving up waiting for lab to spawn!") async def delete_lab(self) -> None: - headers = {"Referer": self.jupyter_url + "hub/home"} + if not await self.is_lab_running(): + return server_url = ( self.jupyter_url + f"hub/api/users/{self.user.username}/server" ) self.log.info(f"Deleting lab for {self.user.username} at {server_url}") - + headers = {"Referer": self.jupyter_url + "hub/home"} async with self.session.delete(server_url, headers=headers) as r: if r.status not in [200, 202, 204]: await self._raise_error("Error deleting lab", r) From 64a10541412ec041c5556410a4d7f4fabecedfb2 Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Wed, 28 Jul 2021 17:31:39 -0700 Subject: [PATCH 13/14] Fix documentation for settle_time --- src/mobu/models/business.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/mobu/models/business.py b/src/mobu/models/business.py index 398c25f4..f6ce7ba9 100644 --- a/src/mobu/models/business.py +++ b/src/mobu/models/business.py @@ -52,8 +52,7 @@ class BusinessConfig(BaseModel): 10, title="How long to wait after lab creation in seconds", description=( - "Only used by the NotebookRunner. It will wait for this long" - " after lab creation before trying to create a session." + "Wait this long after lag creation before trying to use the lab" ), example=10, ) From 500c8f0f3ea229a7900a279caa9d79b5a1c8eb73 Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Thu, 29 Jul 2021 13:18:59 -0700 Subject: [PATCH 14/14] Fix some comments in the mobu models --- src/mobu/models/business.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/mobu/models/business.py b/src/mobu/models/business.py index f6ce7ba9..200a6112 100644 --- a/src/mobu/models/business.py +++ b/src/mobu/models/business.py @@ -52,7 +52,7 @@ class BusinessConfig(BaseModel): 10, title="How long to wait after lab creation in seconds", description=( - "Wait this long after lag creation before trying to use the lab" + "Wait this long after lab creation before trying to use the lab" ), example=10, ) @@ -101,7 +101,13 @@ class BusinessConfig(BaseModel): description=( "For JupyterPythonLoop, this is the number of code snippets to" " execute before restarting the lab. For NotebookRunner, it's" - " the number of notebooks." + " the number of complete notebooks. NotebookRunner goes through" + " the directory of notebooks one-by-one, running the entirety" + " of each one and starting again at the beginning of the list" + " when it runs out, until it has executed a total of" + " max_executions notebooks. It then closes the session (and" + " optionally deletes and recreates the lab, controlled by" + " delete_lab), and then picks up where it left off." ), example=25, )