Skip to content

Commit

Permalink
Merge pull request #78 from lsst-sqre/tickets/DM-31201
Browse files Browse the repository at this point in the history
[DM-31201] Use progress and user API for lab spawning
  • Loading branch information
rra authored Aug 10, 2021
2 parents f35def5 + feaa09b commit d6fe074
Show file tree
Hide file tree
Showing 17 changed files with 682 additions and 240 deletions.
6 changes: 3 additions & 3 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,9 @@ typing-extensions==3.10.0.0 \
# -c requirements/main.txt
# aiohttp
# mypy
virtualenv==20.7.0 \
--hash=sha256:97066a978431ec096d163e72771df5357c5c898ffdd587048f45e0aecc228094 \
--hash=sha256:fdfdaaf0979ac03ae7f76d5224a05b58165f3c804f8aa633f3dd6f22fbd435d5
virtualenv==20.7.1 \
--hash=sha256:57bcb59c5898818bd555b1e0cfcf668bd6204bc2b53ad0e70a52413bd790f9e4 \
--hash=sha256:73863dc3be1efe6ee638e77495c0c195a6384ae7b15c561f3ceb2698ae7267c1
# via pre-commit
yarl==1.6.3 \
--hash=sha256:00d7ad91b6583602eb9c1d085a2cf281ada267e9a197e8b7cae487dadbfa293e \
Expand Down
6 changes: 3 additions & 3 deletions requirements/main.txt
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,9 @@ rfc3986[idna2008]==1.5.0 \
--hash=sha256:270aaf10d87d0d4e095063c65bf3ddbc6ee3d0b226328ce21e036f946e421835 \
--hash=sha256:a86d6e1f5b1dc238b218b012df0aa79409667bb209e58da56d0b94704e712a97
# via httpx
safir==2.0.1 \
--hash=sha256:3a0762484e2986026612947a311886b81b1f983345b5c86f6b2e0bb4a41165e2 \
--hash=sha256:fedc2a331a001262e7153c11540c0afe277b25ebd40295267b3154e82adf99e8
safir==2.1.0 \
--hash=sha256:4e8094f58f61d0cc2ee75e3c17651735350dfcda3e389986c887fb2c5be34fd2 \
--hash=sha256:e92b2e9226d185e34b11c18c00e38a1d9b362cbf1ba128379eab730cdfce4939
# via -r requirements/main.in
smmap==4.0.0 \
--hash=sha256:7e65386bd122d45405ddf795637b7f7d2b532e7e401d46bbe3fb49b9986d5182 \
Expand Down
41 changes: 41 additions & 0 deletions src/mobu/business/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@

import asyncio
from asyncio import Queue, QueueEmpty, TimeoutError
from datetime import datetime, timezone
from enum import Enum
from typing import TYPE_CHECKING

from ..models.business import BusinessData
from ..timings import Timings
from ..util import wait_first

if TYPE_CHECKING:
from typing import AsyncIterable, AsyncIterator, TypeVar

from structlog import BoundLogger

from ..models.business import BusinessConfig
from ..models.user import AuthenticatedUser

T = TypeVar("T")

__all__ = ["Business"]


Expand Down Expand Up @@ -134,6 +140,41 @@ async def pause(self, seconds: float) -> None:
except (TimeoutError, QueueEmpty):
return

async def iter_with_timeout(
self, iterable: AsyncIterable[T], timeout: float
) -> AsyncIterator[T]:
"""Run an iterator with a timeout.
Returns the next element of the iterator on success and ends the
iterator on timeout or if the business was told to shut down. (The
latter two can be distinguished by checking ``self.stopping``.)
Notes
-----
This is unfortunately somewhat complex because we want to read from an
iterator of messages (progress for spawn or WebSocket messages for
code execution) while simultaneously checking our control queue for a
shutdown message and imposing a timeout.
Do this by creating two awaitables, one pause that handles the control
queue and the timeout and the other that waits on the progress
iterator, and then use the ``wait_first`` utility function to wait for
the first one that finishes and abort the other one.
"""
iterator = iterable.__aiter__()
start = datetime.now(tz=timezone.utc)
while True:
now = datetime.now(tz=timezone.utc)
remaining = timeout - (now - start).total_seconds()
if remaining < 0:
break
pause_await = self.pause(timeout)
iter_await = iterator.__anext__()
result = await wait_first(iter_await, pause_await)
if result is None or self.stopping:
break
yield result

def dump(self) -> BusinessData:
return BusinessData(
name=type(self).__name__,
Expand Down
98 changes: 83 additions & 15 deletions src/mobu/business/jupyterloginloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import TYPE_CHECKING

from ..constants import DATE_FORMAT
from ..exceptions import JupyterSpawnError, JupyterTimeoutError
from ..jupyterclient import JupyterClient
from .base import Business

Expand All @@ -17,7 +21,23 @@
from ..models.business import BusinessConfig
from ..user import AuthenticatedUser

__all__ = ["JupyterLoginLoop"]
__all__ = ["JupyterLoginLoop", "ProgressLogMessage"]


@dataclass(frozen=True)
class ProgressLogMessage:
"""A single log message with timestamp from spawn progress."""

message: str
"""The message."""

timestamp: datetime = field(
default_factory=lambda: datetime.now(tz=timezone.utc)
)
"""When the event was received."""

def __str__(self) -> str:
return f"{self.timestamp.strftime(DATE_FORMAT)} - {self.message}"


class JupyterLoginLoop(Business):
Expand Down Expand Up @@ -51,14 +71,18 @@ async def close(self) -> None:

async def startup(self) -> None:
await self.hub_login()
await self.initial_delete_lab()
if not await self._client.is_lab_stopped():
await self.delete_lab()

async def execute(self) -> None:
"""The work done in each iteration of the loop."""
await self.ensure_lab()
await self.lab_settle()
if self.stopping:
return
if self.config.delete_lab or await self._client.is_lab_stopped():
await self.spawn_lab()
if self.stopping:
return
await self.lab_settle()
if self.stopping:
return
await self.lab_login()
await self.lab_business()
if self.config.delete_lab:
Expand All @@ -73,13 +97,42 @@ async def hub_login(self) -> None:
with self.timings.start("hub_login"):
await self._client.hub_login()

async def ensure_lab(self) -> None:
with self.timings.start("ensure_lab"):
await self._client.ensure_lab()
async def spawn_lab(self) -> None:
with self.timings.start("spawn_lab") as sw:
await self._client.spawn_lab()

# Pause before using the progress API, since otherwise it may not
# have attached to the spawner and will not return a full stream
# of events. (It will definitely take longer than 5s for the lab
# to spawn.)
await self.pause(self.config.spawn_settle_time)
if self.stopping:
return

# Watch the progress API until the lab has spawned.
log_messages = []
timeout = self.config.spawn_timeout - self.config.spawn_settle_time
progress = self._client.spawn_progress()
async for message in self.iter_with_timeout(progress, timeout):
log_messages.append(ProgressLogMessage(message.message))
if message.ready:
return

# We only fall through if the spawn failed, timed out, or if we're
# stopping the business.
if self.stopping:
return
log = "\n".join([str(m) for m in log_messages])
if sw.elapsed.total_seconds() > timeout:
elapsed = round(sw.elapsed.total_seconds())
msg = f"Lab did not spawn after {elapsed}s"
raise JupyterTimeoutError(self.user.username, msg, log)
else:
raise JupyterSpawnError(self.user.username, log)

async def lab_settle(self) -> None:
with self.timings.start("lab_settle"):
await self.pause(self.config.settle_time)
await self.pause(self.config.lab_settle_time)

async def lab_login(self) -> None:
with self.timings.start("lab_login"):
Expand All @@ -89,12 +142,27 @@ async def delete_lab(self) -> None:
self.logger.info("Deleting lab")
with self.timings.start("delete_lab"):
await self._client.delete_lab()
self.logger.info("Lab successfully deleted")

async def initial_delete_lab(self) -> None:
self.logger.info("Deleting any existing lab")
with self.timings.start("initial_delete_lab"):
await self._client.delete_lab()
# If we're not stopping, wait for the lab to actually go away. If
# we don't do this, we may try to create a new lab while the old
# one is still shutting down.
if self.stopping:
return
timeout = self.config.delete_timeout
start = datetime.now(tz=timezone.utc)
while not await self._client.is_lab_stopped():
now = datetime.now(tz=timezone.utc)
elapsed = round((now - start).total_seconds())
if elapsed > timeout:
msg = f"Lab not deleted after {elapsed}s"
raise JupyterTimeoutError(self.user.username, msg)
msg = f"Waiting for lab deletion ({elapsed}s elapsed)"
self.logger.info(msg)
await self.pause(2)
if self.stopping:
return

self.logger.info("Lab successfully deleted")

async def lab_business(self) -> None:
"""Do whatever business we want to do inside a lab.
Expand Down
72 changes: 67 additions & 5 deletions src/mobu/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from datetime import datetime, timezone
from typing import TYPE_CHECKING

from aiohttp import ClientResponseError

from .constants import DATE_FORMAT

if TYPE_CHECKING:
Expand Down Expand Up @@ -158,6 +160,18 @@ def to_slack(self) -> Dict[str, Any]:
class JupyterError(SlackError):
"""Web error from JupyterHub or JupyterLab."""

@classmethod
def from_exception(
cls, user: str, exc: ClientResponseError
) -> JupyterError:
return cls(
url=str(exc.request_info.url),
user=user,
status=exc.status,
reason=exc.message,
method=exc.request_info.method,
)

@classmethod
async def from_response(
cls, user: str, response: ClientResponse
Expand All @@ -179,7 +193,7 @@ def __init__(
status: int,
reason: Optional[str],
method: str,
body: str,
body: Optional[str] = None,
) -> None:
self.url = url
self.status = status
Expand All @@ -189,10 +203,13 @@ def __init__(
super().__init__(user, f"Status {status} from {method} {url}")

def __str__(self) -> str:
return (
result = (
f"{self.user}: status {self.status} ({self.reason}) from"
f" {self.method} {self.url}\nBody:\n{self.body}\n"
f" {self.method} {self.url}"
)
if self.body:
result += f"\nBody:\n{self.body}\n"
return result

def to_slack(self) -> Dict[str, Any]:
"""Format the error as a Slack Block Kit message."""
Expand All @@ -211,18 +228,63 @@ def to_slack(self) -> Dict[str, Any]:
}


class JupyterSpawnError(SlackError):
"""The Jupyter Lab pod failed to spawn."""

def __init__(self, user: str, log: str) -> None:
super().__init__(user, "Spawning lab failed")
self.log = log

def to_slack(self) -> Dict[str, Any]:
"""Format the error as a Slack Block Kit message."""
return {
"blocks": [
{
"type": "section",
"text": {"type": "mrkdwn", "text": "Spawning lab failed"},
},
{"type": "section", "fields": self.common_fields()},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Log*\n{self.log}",
"verbatim": True,
},
},
{"type": "divider"},
]
}


class JupyterTimeoutError(SlackError):
"""Timed out waiting for the lab to spawn."""

def __init__(self, user: str, msg: str, log: Optional[str] = None) -> None:
super().__init__(user, msg)
self.log = log

def to_slack(self) -> Dict[str, Any]:
"""Format the error as a Slack Block Kit message."""
return {
result = {
"blocks": [
{
"type": "section",
"text": {"type": "mrkdwn", "text": str(self)},
},
{"type": "section", "fields": self.common_fields()},
{"type": "divider"},
]
}
if self.log:
result["blocks"].append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Log*\n{self.log}",
"verbatim": True,
},
}
)
result["blocks"].append({"type": "divider"})
return result
2 changes: 1 addition & 1 deletion src/mobu/handlers/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
from fastapi.responses import FileResponse, JSONResponse
from safir.dependencies.logger import logger_dependency
from safir.metadata import get_metadata
from safir.models import ErrorModel
from structlog.stdlib import BoundLogger

from ..config import config
from ..dependencies.manager import (
MonkeyBusinessManager,
monkey_business_manager,
)
from ..models.error import ErrorModel
from ..models.flock import FlockConfig, FlockData
from ..models.index import Index
from ..models.monkey import MonkeyData
Expand Down
Loading

0 comments on commit d6fe074

Please sign in to comment.