Skip to content

Commit

Permalink
Merge pull request #74 from lsst-sqre/tickets/DM-31201
Browse files Browse the repository at this point in the history
[DM-31201] Clean shutdown and other fixes
  • Loading branch information
rra authored Jul 29, 2021
2 parents 784a893 + 500c8f0 commit a62fc29
Show file tree
Hide file tree
Showing 17 changed files with 505 additions and 299 deletions.
93 changes: 87 additions & 6 deletions src/mobu/business/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from __future__ import annotations

import asyncio
from asyncio import Queue, QueueEmpty, TimeoutError
from enum import Enum
from typing import TYPE_CHECKING

from ..models.business import BusinessData
Expand All @@ -17,9 +19,30 @@
__all__ = ["Business"]


class BusinessCommand(Enum):
"""Commands sent over the internal control queue."""

STOP = "STOP"


class Business:
"""Base class for monkey business (one type of repeated operation).
The basic flow for a monkey business is as follows:
- Run ``startup``
- In a loop, run ``execute`` followed by ``idle`` until told to stop
- When told to stop, run ``shutdown``
Subclasses should override ``startup``, ``execute``, and ``shutdown`` to
add appropriate behavior. ``idle`` by default waits for ``idle_time``,
which generally does not need to be overridden. Subclasses should also
override ``close`` to shut down any object state created in ``__init__``.
All delays should be done by calling ``pause``, and the caller should
check ``self.stopping`` and exit any loops if it is `True` after calling
``pause``.
Parameters
----------
logger : `structlog.BoundLogger`
Expand All @@ -42,17 +65,75 @@ def __init__(
self.success_count = 0
self.failure_count = 0
self.timings = Timings()
self.control: Queue[BusinessCommand] = Queue()
self.stopping = False

async def close(self) -> None:
"""Clean up any business state on shutdown."""
pass

async def run(self) -> None:
while True:
self.logger.info("Idling...")
with self.timings.start("idle"):
await asyncio.sleep(5)
self.success_count += 1
"""The core business logic, run in a background task."""
self.logger.info("Starting up...")
try:
await self.startup()

async def stop(self) -> None:
while not self.stopping:
self.logger.info("Starting next iteration")
try:
await self.execute()
self.success_count += 1
except Exception:
self.failure_count += 1
raise
await self.idle()

self.logger.info("Shutting down...")
await self.shutdown()
await self.close()
finally:
# Tell the control channel we've processed the stop command.
if self.stopping:
self.control.task_done()

async def startup(self) -> None:
"""Run before the start of the first iteration and then not again."""
pass

async def execute(self) -> None:
"""The business done in each loop."""
pass

async def idle(self) -> None:
"""The idle pause at the end of each loop."""
self.logger.info("Idling...")
with self.timings.start("idle"):
await self.pause(self.config.idle_time)

async def shutdown(self) -> None:
"""Any cleanup to do before exiting after stopping."""
pass

async def stop(self) -> None:
"""Tell the running background task to stop and wait for that."""
self.stopping = True
await self.control.put(BusinessCommand.STOP)
await self.control.join()
self.logger.info("Stopped")
await self.close()

async def pause(self, seconds: float) -> None:
"""Pause for up to the number of seconds, handling commands."""
if self.stopping:
return
try:
if seconds:
await asyncio.wait_for(self.control.get(), seconds)
else:
self.control.get_nowait()
except (TimeoutError, QueueEmpty):
return

def dump(self) -> BusinessData:
return BusinessData(
name=type(self).__name__,
Expand Down
12 changes: 7 additions & 5 deletions src/mobu/business/jupyterjitterloginloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from __future__ import annotations

import asyncio
import random

from .jupyterloginloop import JupyterLoginLoop
Expand All @@ -20,14 +19,17 @@ class JupyterJitterLoginLoop(JupyterLoginLoop):

async def startup(self) -> None:
with self.timings.start("pre_login_delay"):
await asyncio.sleep(random.uniform(0, 30))
await self.pause(random.uniform(0, 30))
if self.stopping:
return
await super().startup()
await asyncio.sleep(random.uniform(10, 30))
await self.pause(random.uniform(10, 30))

async def lab_business(self) -> None:
with self.timings.start("lab_wait"):
await asyncio.sleep(1200 + random.uniform(0, 600))
await self.pause(1200 + random.uniform(0, 600))

async def idle(self) -> None:
self.logger.info("Idling...")
with self.timings.start("idle"):
await asyncio.sleep(30 + random.uniform(0, 60))
await self.pause(30 + random.uniform(0, 60))
109 changes: 44 additions & 65 deletions src/mobu/business/jupyterloginloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

from __future__ import annotations

import asyncio
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING

from ..jupyterclient import JupyterClient
Expand All @@ -25,6 +23,15 @@
class JupyterLoginLoop(Business):
"""Business that logs on to the hub, creates a lab, and deletes it.
This class modifies the core `~mobu.business.base.Business` loop by
providing the overall ``execute`` framework and defualt ``startup`` and
``shutdown`` methods. It will log on to JupyterHub, ensure no lab
currently exists, create a lab, run ``lab_business``, and then shut down
the lab before starting another iteration.
Subclasses should override ``lab_business`` to do whatever they want to do
inside a lab. The default behavior just waits for ``login_idle_time``.
Once this business has been stopped, it cannot be started again (the
`aiohttp.ClientSession` will be closed), and the instance should be
dropped after retrieving any status information.
Expand All @@ -38,87 +45,59 @@ def __init__(
) -> None:
super().__init__(logger, business_config, user)
self._client = JupyterClient(user, logger, business_config)
self._last_login = datetime.fromtimestamp(0, tz=timezone.utc)

async def run(self) -> None:
self.logger.info("Starting up...")
await self.startup()
while True:
await self.reauth_if_needed()
self.logger.info("Starting next iteration")
try:
await self.ensure_lab()
await self.lab_business()
await self.delete_lab()
self.success_count += 1
except Exception:
self.failure_count += 1
raise
await self.lab_idle()

async def close(self) -> None:
await self._client.close()

async def startup(self) -> None:
"""Run before the start of the first iteration and then not again."""
await self.hub_login()
await self.initial_delete_lab()

async def execute(self) -> None:
"""The work done in each iteration of the loop."""
await self.ensure_lab()
await self.lab_settle()
if self.stopping:
return
await self.lab_business()
if self.config.delete_lab:
await self.hub_login()
await self.delete_lab()

async def shutdown(self) -> None:
await self.delete_lab()

async def hub_login(self) -> None:
self.logger.info("Logging in to hub")
with self.timings.start("hub_login"):
await self._client.hub_login()
self._last_login = self._now()

async def ensure_lab(self) -> None:
with self.timings.start("ensure_lab"):
await self._client.ensure_lab()
self.logger.info("Lab created.")
self.logger.info("Lab created")

async def lab_settle(self) -> None:
with self.timings.start("lab_settle"):
await self.pause(self.config.settle_time)

async def delete_lab(self) -> None:
self.logger.info("Deleting lab.")
self.logger.info("Deleting lab")
with self.timings.start("delete_lab"):
await self._client.delete_lab()
self.logger.info("Lab successfully deleted.")
self.logger.info("Lab successfully deleted")

async def initial_delete_lab(self) -> None:
self.logger.info("Deleting any existing lab")
with self.timings.start("initial_delete_lab"):
await self._client.delete_lab()

async def lab_business(self) -> None:
"""Do whatever business we want to do inside a lab.
Placeholder function intended to be overridden by subclasses.
Placeholder function intended to be overridden by subclasses. The
default behavior is to wait a minute and then shut the lab down
again.
"""
with self.timings.start("lab_wait"):
await asyncio.sleep(5)

async def lab_idle(self) -> None:
"""Executed at the end of each iteration for a given lab.
Intended to be overridden by subclasses if they want different idle
behavior.
"""
delay = self.config.lab_idle_time
if delay > 0:
with self.timings.start("idle"):
await asyncio.sleep(delay)

async def execution_idle(self) -> None:
"""Executed between each unit of work execution (usually a Lab
cell).
"""
delay = self.config.execution_idle_time
if delay > 0:
with self.timings.start("execution_idle"):
await asyncio.sleep(self.config.execution_idle_time)

def _now(self) -> datetime:
return datetime.now(timezone.utc)

async def reauth_if_needed(self) -> None:
now = self._now()
elapsed = now - self._last_login
if elapsed > timedelta(self.config.reauth_interval):
await self.hub_reauth()

async def hub_reauth(self) -> None:
self.logger.info("Reauthenticating to Hub")
with self.timings.start("hub_reauth"):
await self._client.hub_login()

async def stop(self) -> None:
with self.timings.start("delete_lab_on_stop"):
await self._client.delete_lab()
await self._client.close()
await self.pause(self.config.login_idle_time)
38 changes: 26 additions & 12 deletions src/mobu/business/jupyterpythonloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@


class JupyterPythonLoop(JupyterLoginLoop):
"""Run simple Python code in a loop inside a lab kernel."""
"""Run simple Python code in a loop inside a lab kernel.
This can be used as a base class for other JupyterLab code execution
monkey business. Override ``execute_code`` to change what code is
executed. When doing so, be sure to call ``execute_idle`` between each
code execution and check ``self.stopping`` after it returns, exiting any
loops if ``self.stopping`` is true.
"""

async def lab_business(self) -> None:
await self.reauth_if_needed()
if self.stopping:
return
session = await self.create_session()
for count in range(self.config.max_executions):
await self.execute_code(session, self.config.code)
await self.execution_idle()
await self.execute_code(session)
await self.delete_session(session)

async def create_session(self) -> JupyterLabSession:
Expand All @@ -27,13 +33,21 @@ async def create_session(self) -> JupyterLabSession:
session = await self._client.create_labsession()
return session

async def execute_code(
self, session: JupyterLabSession, code: str
) -> None:
with self.timings.start("execute_code", {"code": code}) as sw:
reply = await self._client.run_python(session, code)
sw.annotation["result"] = reply
self.logger.info(f"{code} -> {reply}")
async def execute_code(self, session: JupyterLabSession) -> None:
code = self.config.code
for count in range(self.config.max_executions):
with self.timings.start("execute_code", {"code": code}) as sw:
reply = await self._client.run_python(session, code)
sw.annotation["result"] = reply
self.logger.info(f"{code} -> {reply}")
await self.execution_idle()
if self.stopping:
break

async def execution_idle(self) -> None:
"""Executed between each unit of work execution."""
with self.timings.start("execution_idle"):
await self.pause(self.config.execution_idle_time)

async def delete_session(self, session: JupyterLabSession) -> None:
self.logger.info("delete_session")
Expand Down
Loading

0 comments on commit a62fc29

Please sign in to comment.