Skip to content

Commit

Permalink
Merge pull request #73 from lsst-sqre/tickets/DM-31002-refactor
Browse files Browse the repository at this point in the history
Use JupyterLab sessions rather than kernels
  • Loading branch information
athornton authored Jul 27, 2021
2 parents bbaa2cb + a128f4e commit 784a893
Show file tree
Hide file tree
Showing 15 changed files with 313 additions and 197 deletions.
34 changes: 17 additions & 17 deletions requirements/main.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,23 @@ asgiref==3.4.1 \
--hash=sha256:4ef1ab46b484e3c706329cedeff284a5d40824200638503f5768edb6de7d58e9 \
--hash=sha256:ffc141aa908e6f175673e7b1b3b7af4fdb0ecb738fc5c8b88f69f055c2415214
# via uvicorn
astropy==4.2.1 \
--hash=sha256:009a26f795adad1f0b26ba3a434e5be9cfa82cb629ba87c0547b567bad6e1695 \
--hash=sha256:03428ca1baa4fba99e37d3767c12c038c456a27176bcb8f407f9b2b0743ef8ee \
--hash=sha256:09965d5e8ffd7e96e7fcc596b631f366cf729df75efb792918cb6637acf1ad4e \
--hash=sha256:12c76c119f7a0a8fcf0e72269be9faa88319f12e4ba346180d910e58fda36bf2 \
--hash=sha256:2035ca521d86c88ea6d8da07f977a9727f0d7d8f85b5c287558c1891f885e548 \
--hash=sha256:223610cc612aaddac890fefd9e20dc6a39c92ef01692354e2bcb273c79fb8842 \
--hash=sha256:2d4d328892c7b09a23361f44182cf89be3dadaec60a270bd4fe754f3829052a4 \
--hash=sha256:2ff194e15b03afd575f278e2187b71d7ee9d85f302356050b2257b6c4788f1cc \
--hash=sha256:3d5516ba20e6cbc208250dd8f414243839cc40e957616e3f336a517967ee34d0 \
--hash=sha256:5be2f01d1b35202c0989f4502d25fe850ae5e891acbd3be107eaf6eeab81826d \
--hash=sha256:6d8c8bc1eef048ad873395d2a620b9b5f308bef9a508f542e6dc3b33fbfbe66d \
--hash=sha256:a1f707283822c2f7df97d9de151c29d49ed9cc0bf3ae952f91012d7a4c5872a7 \
--hash=sha256:a6164013de3732a67a5a1a2743565f5aaef0d895ce33d5aef482d88b05318893 \
--hash=sha256:c327cfdede8d5fce1224153b8d3a060226161ddc2e1b2170f076aaddb4953965 \
--hash=sha256:db694c10eb3cc10068859ba1eab30b38b7e821dbfff142960c5a99c4af059747 \
--hash=sha256:ed483e472241153daec45f4b0c318c2c63d9f47305b78e6e63d32fc388c18427
astropy==4.3.post1 \
--hash=sha256:1b9f976f32675ae715bd38fcfb3540fcf01f6be1d75266c39b6fb085adac9410 \
--hash=sha256:1d879b91bbd61d69682c72b773d9c8851eff23e51cf1fb86b32dc80e783be643 \
--hash=sha256:460e07cb72789976421689833556a0a364dba89e25fe9183e669702dbcbf72c2 \
--hash=sha256:52c1038bad27e7ae1154eda0447ba790f13cb9bcb98e04c60493c2e547a87e6c \
--hash=sha256:607739958f8fcc37d4f2efdb8b9bbcc3bf063c880e2d83104cf72a81c3e89b91 \
--hash=sha256:6779cc4c840128d12a6721d6fc835c826646967b019daaeacc6e71ed79850973 \
--hash=sha256:6ac307416a8718e4f4ed42f42c14587754fe6df38d822533c6d795db64a29453 \
--hash=sha256:8062ece72d6fe6f4349219c9e2e0f213847de4f7f3bc3796ce7949bc1a202bf9 \
--hash=sha256:8d70811b68f702d7b2e21fc0d644f88baf977482b1f915011460cc1b7fa285ea \
--hash=sha256:a1b57aa21fdc8526dbca96b69cdcaba83066d7688ff52b87b7cdf5362e75f112 \
--hash=sha256:a420917baf9e02252e3477da265675167a324716f8a09e78607577b5bd27a159 \
--hash=sha256:be62ba888f0dd82a7265698a602e64ca2466f1f17bd6c4ef085a055c5968e458 \
--hash=sha256:dc8acf92a20c9c916f25b17dbb0f618f91751941ec36f240452871e72fb608ca \
--hash=sha256:e2670fc08868cd8d0d1e653dcfcf224b506235b30434742538702f74c6859d24 \
--hash=sha256:e95ddb6fb0850fe28690e498fe8d2c3cee72d54c098083a2fe0e01b7b3895771 \
--hash=sha256:ffe13ed14298e819f2be5224aa649c44f8ddc9fcf32b5d4296fc62624ae867cd
# via pyvo
async-timeout==3.0.1 \
--hash=sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f \
Expand Down
41 changes: 35 additions & 6 deletions src/mobu/business/jupyterloginloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from __future__ import annotations

import asyncio
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING

from ..jupyterclient import JupyterClient
Expand Down Expand Up @@ -37,11 +38,13 @@ def __init__(
) -> None:
super().__init__(logger, business_config, user)
self._client = JupyterClient(user, logger, business_config)
self._last_login = datetime.fromtimestamp(0, tz=timezone.utc)

async def run(self) -> None:
self.logger.info("Starting up...")
await self.startup()
while True:
await self.reauth_if_needed()
self.logger.info("Starting next iteration")
try:
await self.ensure_lab()
Expand All @@ -51,7 +54,7 @@ async def run(self) -> None:
except Exception:
self.failure_count += 1
raise
await self.idle()
await self.lab_idle()

async def startup(self) -> None:
"""Run before the start of the first iteration and then not again."""
Expand All @@ -60,6 +63,7 @@ async def startup(self) -> None:
async def hub_login(self) -> None:
with self.timings.start("hub_login"):
await self._client.hub_login()
self._last_login = self._now()

async def ensure_lab(self) -> None:
with self.timings.start("ensure_lab"):
Expand All @@ -78,16 +82,41 @@ async def lab_business(self) -> None:
Placeholder function intended to be overridden by subclasses.
"""
with self.timings.start("lab_wait"):
await asyncio.sleep(60)
await asyncio.sleep(5)

async def idle(self) -> None:
"""Executed at the end of each iteration.
async def lab_idle(self) -> None:
"""Executed at the end of each iteration for a given lab.
Intended to be overridden by subclasses if they want different idle
behavior.
"""
with self.timings.start("idle"):
await asyncio.sleep(60)
delay = self.config.lab_idle_time
if delay > 0:
with self.timings.start("idle"):
await asyncio.sleep(delay)

async def execution_idle(self) -> None:
"""Executed between each unit of work execution (usually a Lab
cell).
"""
delay = self.config.execution_idle_time
if delay > 0:
with self.timings.start("execution_idle"):
await asyncio.sleep(self.config.execution_idle_time)

def _now(self) -> datetime:
return datetime.now(timezone.utc)

async def reauth_if_needed(self) -> None:
now = self._now()
elapsed = now - self._last_login
if elapsed > timedelta(self.config.reauth_interval):
await self.hub_reauth()

async def hub_reauth(self) -> None:
self.logger.info("Reauthenticating to Hub")
with self.timings.start("hub_reauth"):
await self._client.hub_login()

async def stop(self) -> None:
with self.timings.start("delete_lab_on_stop"):
Expand Down
49 changes: 22 additions & 27 deletions src/mobu/business/jupyterpythonloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,38 @@
over again.
"""

import asyncio

from ..jupyterclient import JupyterLabSession
from .jupyterloginloop import JupyterLoginLoop

__all__ = ["JupyterPythonLoop"]

MAX_EXECUTIONS = 20
SLEEP_TIME = 1


class JupyterPythonLoop(JupyterLoginLoop):
"""Run simple Python code in a loop inside a lab kernel."""

async def lab_business(self) -> None:
kernel = await self.create_kernel()
for count in range(MAX_EXECUTIONS):
await self.execute_code(kernel, "print(2+2, end='')")
await self.lab_wait()
await self.delete_kernel(kernel)

async def create_kernel(self) -> str:
self.logger.info("create_kernel")
with self.timings.start("create_kernel"):
kernel = await self._client.create_kernel()
return kernel

async def execute_code(self, kernel: str, code: str) -> None:
await self.reauth_if_needed()
session = await self.create_session()
for count in range(self.config.max_executions):
await self.execute_code(session, self.config.code)
await self.execution_idle()
await self.delete_session(session)

async def create_session(self) -> JupyterLabSession:
self.logger.info("create_session")
with self.timings.start("create_session"):
session = await self._client.create_labsession()
return session

async def execute_code(
self, session: JupyterLabSession, code: str
) -> None:
with self.timings.start("execute_code", {"code": code}) as sw:
reply = await self._client.run_python(kernel, code)
reply = await self._client.run_python(session, code)
sw.annotation["result"] = reply
self.logger.info(f"{code} -> {reply}")

async def lab_wait(self) -> None:
with self.timings.start("lab_wait"):
await asyncio.sleep(SLEEP_TIME)

async def delete_kernel(self, kernel: str) -> None:
self.logger.info("delete_kernel")
with self.timings.start("delete_kernel"):
await self._client.delete_kernel(kernel)
async def delete_session(self, session: JupyterLabSession) -> None:
self.logger.info("delete_session")
with self.timings.start("delete_session"):
await self._client.delete_labsession(session)
57 changes: 22 additions & 35 deletions src/mobu/business/notebookrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
import asyncio
import json
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING

import git

from ..jupyterclient import NotebookException
from ..jupyterclient import JupyterLabSession, NotebookException
from ..models.business import BusinessData
from .jupyterloginloop import JupyterLoginLoop

Expand Down Expand Up @@ -44,7 +43,6 @@ def __init__(
self.notebook: Optional[os.DirEntry] = None
self.running_code: Optional[str] = None
self._failed_notebooks: List[str] = []
self._last_login = datetime.fromtimestamp(0, tz=timezone.utc)
self._repo_dir = TemporaryDirectory()
self._repo: Optional[git.Repo] = None
self._notebook_iterator: Optional[Iterator[os.DirEntry]] = None
Expand Down Expand Up @@ -80,7 +78,6 @@ async def startup(self) -> None:
self._notebook_iterator = os.scandir(self._repo_dir.name)
self.logger.info("Repository cloned and ready")
await super().startup()
self._last_login = self._now()
await self.initial_delete_lab()

def clone_repo(self) -> None:
Expand All @@ -100,23 +97,23 @@ async def lab_business(self) -> None:

await self.ensure_lab()
await self.lab_settle()
kernel = await self.create_kernel()
session = await self.create_session()

self.logger.info(f"Starting notebook: {self.notebook.name}")
cells = self.read_notebook(self.notebook.name, self.notebook.path)
for count in range(self.config.notebook_iterations):
iteration = f"{count + 1}/{self.config.notebook_iterations}"
msg = f"Notebook '{self.notebook.name}' iteration {iteration}"
self.logger.info(msg)

await self._reauth_if_needed()
await self.reauth_if_needed()

for cell in cells:
self.running_code = "".join(cell["source"])
await self.execute_code(kernel, self.running_code)
await self.execute_code(session, self.running_code)
await self.execution_idle()

self.running_code = None
await self.delete_kernel(kernel)
await self.delete_session(session)
self.logger.info(f"Success running notebook: {self.notebook.name}")

async def lab_settle(self) -> None:
Expand All @@ -129,23 +126,28 @@ def read_notebook(self, name: str, path: str) -> List[Dict[str, Any]]:
cells = json.loads(notebook_text)["cells"]
return [c for c in cells if c["cell_type"] == "code"]

async def create_kernel(self) -> str:
self.logger.info("create_kernel")
with self.timings.start("create_kernel"):
kernel = await self._client.create_kernel()
return kernel
async def create_session(self) -> JupyterLabSession:
self.logger.info("create_session")
notebook_name = self.notebook.name if self.notebook else None
with self.timings.start("create_session"):
session = await self._client.create_labsession(
notebook_name=notebook_name,
)
return session

async def execute_code(self, kernel: str, code: str) -> None:
async def execute_code(
self, session: JupyterLabSession, code: str
) -> None:
self.logger.info("Executing:\n%s\n", code)
with self.timings.start("run_code", {"code": code}) as sw:
reply = await self._client.run_python(kernel, code)
reply = await self._client.run_python(session, code)
sw.annotation["result"] = reply
self.logger.info(f"Result:\n{reply}\n")

async def delete_kernel(self, kernel: str) -> None:
self.logger.info(f"Deleting kernel {kernel}")
with self.timings.start("delete_kernel"):
await self._client.delete_kernel(kernel)
async def delete_session(self, session: JupyterLabSession) -> None:
self.logger.info(f"Deleting session {session}")
with self.timings.start("delete_session"):
await self._client.delete_labsession(session)

def dump(self) -> BusinessData:
data = super().dump()
Expand All @@ -165,18 +167,3 @@ def _next_notebook(self) -> None:
)
self._notebook_iterator = os.scandir(self._repo_dir.name)
self._next_notebook()

def _now(self) -> datetime:
return datetime.now(timezone.utc)

async def _reauth_if_needed(self) -> None:
now = self._now()
elapsed = now - self._last_login
if elapsed > timedelta(minutes=45):
await self.hub_reauth()
self._last_login = now

async def hub_reauth(self) -> None:
self.logger.info("Reauthenticating to Hub")
with self.timings.start("hub_reauth"):
await self._client.hub_login()
Loading

0 comments on commit 784a893

Please sign in to comment.