From 759aa11c0848ca10fb96adf8d07ee579e34bec92 Mon Sep 17 00:00:00 2001 From: Tasko Olevski Date: Fri, 27 Sep 2024 16:01:32 +0200 Subject: [PATCH] feat: use cloud storage from amalthea (#387) Co-authored-by: Samuel Gaist --- bases/renku_data_services/data_api/app.py | 2 + .../notebooks/api.spec.yaml | 3 +- .../api/amalthea_patches/git_sidecar.py | 4 +- .../api/amalthea_patches/init_containers.py | 12 +- .../api/amalthea_patches/jupyter_server.py | 6 +- .../api/classes/cloud_storage/__init__.py | 8 +- .../notebooks/api/classes/image.py | 6 +- .../notebooks/api/classes/k8s_client.py | 14 +- .../notebooks/api/classes/server.py | 20 +-- .../notebooks/api/schemas/cloud_storage.py | 121 +++++++++++------- .../renku_data_services/notebooks/apispec.py | 4 +- .../notebooks/blueprints.py | 121 +++++++++++++----- .../notebooks/cr_amalthea_session.py | 2 +- .../renku_data_services/notebooks/crs.py | 2 + .../renku_data_services/storage/blueprints.py | 1 - components/renku_data_services/storage/db.py | 12 +- components/renku_data_services/storage/orm.py | 4 +- .../data_api/test_notebooks.py | 2 +- 18 files changed, 219 insertions(+), 125 deletions(-) diff --git a/bases/renku_data_services/data_api/app.py b/bases/renku_data_services/data_api/app.py index 48ed004fd..cf4b6a9a8 100644 --- a/bases/renku_data_services/data_api/app.py +++ b/bases/renku_data_services/data_api/app.py @@ -138,6 +138,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic: nb_config=config.nb_config, internal_gitlab_authenticator=config.gitlab_authenticator, git_repo=config.git_repositories_repo, + rp_repo=config.rp_repo, ) notebooks_new = NotebooksNewBP( name="notebooks", @@ -146,6 +147,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic: nb_config=config.nb_config, project_repo=config.project_repo, session_repo=config.session_repo, + storage_repo=config.storage_v2_repo, rp_repo=config.rp_repo, internal_gitlab_authenticator=config.gitlab_authenticator, ) diff --git a/components/renku_data_services/notebooks/api.spec.yaml b/components/renku_data_services/notebooks/api.spec.yaml index 65514be0d..8767dc738 100644 --- a/components/renku_data_services/notebooks/api.spec.yaml +++ b/components/renku_data_services/notebooks/api.spec.yaml @@ -1021,11 +1021,12 @@ components: storage_id: allOf: - "$ref": "#/components/schemas/Ulid" - - description: If the storage_id is provided then this config must replace an existing storage config in the session + - description: The storage ID is used to know which storage config from the DB should be overriden required: - configuration - source_path - target_path + - storage_id ServerName: type: string minLength: 5 diff --git a/components/renku_data_services/notebooks/api/amalthea_patches/git_sidecar.py b/components/renku_data_services/notebooks/api/amalthea_patches/git_sidecar.py index 0e76516e3..ceac8e248 100644 --- a/components/renku_data_services/notebooks/api/amalthea_patches/git_sidecar.py +++ b/components/renku_data_services/notebooks/api/amalthea_patches/git_sidecar.py @@ -22,7 +22,7 @@ async def main(server: "UserServer") -> list[dict[str, Any]]: commit_sha = getattr(server, "commit_sha", None) volume_mount = { - "mountPath": server.work_dir.absolute().as_posix(), + "mountPath": server.work_dir.as_posix(), "name": "workspace", } if gl_project_path: @@ -51,7 +51,7 @@ async def main(server: "UserServer") -> list[dict[str, Any]]: "env": [ { "name": "GIT_RPC_MOUNT_PATH", - "value": server.work_dir.absolute().as_posix(), + "value": server.work_dir.as_posix(), }, { "name": "GIT_RPC_PORT", diff --git a/components/renku_data_services/notebooks/api/amalthea_patches/init_containers.py b/components/renku_data_services/notebooks/api/amalthea_patches/init_containers.py index bec03016b..fbb789372 100644 --- a/components/renku_data_services/notebooks/api/amalthea_patches/init_containers.py +++ b/components/renku_data_services/notebooks/api/amalthea_patches/init_containers.py @@ -35,11 +35,11 @@ async def git_clone_container_v2(server: "UserServer") -> dict[str, Any] | None: env = [ { "name": f"{prefix}WORKSPACE_MOUNT_PATH", - "value": server.workspace_mount_path.absolute().as_posix(), + "value": server.workspace_mount_path.as_posix(), }, { "name": f"{prefix}MOUNT_PATH", - "value": server.work_dir.absolute().as_posix(), + "value": server.work_dir.as_posix(), }, { "name": f"{prefix}LFS_AUTO_FETCH", @@ -134,7 +134,7 @@ async def git_clone_container_v2(server: "UserServer") -> dict[str, Any] | None: }, "volumeMounts": [ { - "mountPath": server.workspace_mount_path.absolute().as_posix(), + "mountPath": server.workspace_mount_path.as_posix(), "name": amalthea_session_work_volume, }, *etc_cert_volume_mount, @@ -161,11 +161,11 @@ async def git_clone_container(server: "UserServer") -> dict[str, Any] | None: env = [ { "name": f"{prefix}WORKSPACE_MOUNT_PATH", - "value": server.workspace_mount_path.absolute().as_posix(), + "value": server.workspace_mount_path.as_posix(), }, { "name": f"{prefix}MOUNT_PATH", - "value": server.work_dir.absolute().as_posix(), + "value": server.work_dir.as_posix(), }, { "name": f"{prefix}LFS_AUTO_FETCH", @@ -260,7 +260,7 @@ async def git_clone_container(server: "UserServer") -> dict[str, Any] | None: }, "volumeMounts": [ { - "mountPath": server.workspace_mount_path.absolute().as_posix(), + "mountPath": server.workspace_mount_path.as_posix(), "name": "workspace", }, *etc_cert_volume_mount, diff --git a/components/renku_data_services/notebooks/api/amalthea_patches/jupyter_server.py b/components/renku_data_services/notebooks/api/amalthea_patches/jupyter_server.py index 31b1a26e3..6f7affc09 100644 --- a/components/renku_data_services/notebooks/api/amalthea_patches/jupyter_server.py +++ b/components/renku_data_services/notebooks/api/amalthea_patches/jupyter_server.py @@ -43,7 +43,7 @@ def env(server: "UserServer") -> list[dict[str, Any]]: "path": "/statefulset/spec/template/spec/containers/0/env/-", "value": { "name": "NOTEBOOK_DIR", - "value": server.work_dir.absolute().as_posix(), + "value": server.work_dir.as_posix(), }, }, { @@ -53,7 +53,7 @@ def env(server: "UserServer") -> list[dict[str, Any]]: # relative to $HOME. "value": { "name": "MOUNT_PATH", - "value": server.work_dir.absolute().as_posix(), + "value": server.work_dir.as_posix(), }, }, { @@ -223,7 +223,7 @@ def rstudio_env_variables(server: "UserServer") -> list[dict[str, Any]]: "path": "/statefulset/spec/template/spec/containers/0/volumeMounts/-", "value": { "name": secret_name, - "mountPath": mount_location.absolute().as_posix(), + "mountPath": mount_location.as_posix(), "subPath": mount_location.name, "readOnly": True, }, diff --git a/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py b/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py index 015653284..a66b2728d 100644 --- a/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py +++ b/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py @@ -6,17 +6,15 @@ class ICloudStorageRequest(Protocol): """The abstract class for cloud storage.""" - exists: bool mount_folder: str - source_folder: str - bucket: str + source_path: str def get_manifest_patch( self, base_name: str, namespace: str, - labels: dict[str, str] = {}, - annotations: dict[str, str] = {}, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, ) -> list[dict[str, Any]]: """The patches applied to a jupyter server to insert the storage in the session.""" ... diff --git a/components/renku_data_services/notebooks/api/classes/image.py b/components/renku_data_services/notebooks/api/classes/image.py index 6a38aaf7f..6ced400eb 100644 --- a/components/renku_data_services/notebooks/api/classes/image.py +++ b/components/renku_data_services/notebooks/api/classes/image.py @@ -4,7 +4,7 @@ import re from dataclasses import dataclass, field from enum import Enum -from pathlib import Path +from pathlib import PurePosixPath from typing import Any, Optional, Self, cast import requests @@ -101,7 +101,7 @@ def get_image_config(self, image: "Image") -> Optional[dict[str, Any]]: return None return cast(dict[str, Any], res.json()) - def image_workdir(self, image: "Image") -> Optional[Path]: + def image_workdir(self, image: "Image") -> Optional[PurePosixPath]: """Query the docker API to get the workdir of an image.""" config = self.get_image_config(image) if config is None: @@ -112,7 +112,7 @@ def image_workdir(self, image: "Image") -> Optional[Path]: workdir = nested_config.get("WorkingDir", "/") if workdir == "": workdir = "/" - return Path(workdir) + return PurePosixPath(workdir) def with_oauth2_token(self, oauth2_token: str) -> "ImageRepoDockerAPI": """Return a docker API instance with the token as authentication.""" diff --git a/components/renku_data_services/notebooks/api/classes/k8s_client.py b/components/renku_data_services/notebooks/api/classes/k8s_client.py index b6ef3420d..6abef4217 100644 --- a/components/renku_data_services/notebooks/api/classes/k8s_client.py +++ b/components/renku_data_services/notebooks/api/classes/k8s_client.py @@ -353,10 +353,13 @@ def __init__(self, url: str, server_type: type[_SessionType]): self.url = url self.client = httpx.AsyncClient() self.server_type: type[_SessionType] = server_type + self.url_path_name = "servers" + if server_type == AmaltheaSessionV1Alpha1: + self.url_path_name = "sessions" async def list_servers(self, safe_username: str) -> list[_SessionType]: """List the jupyter servers.""" - url = urljoin(self.url, f"/users/{safe_username}/servers") + url = urljoin(self.url, f"/users/{safe_username}/{self.url_path_name}") try: res = await self.client.get(url, timeout=10) except httpx.RequestError as err: @@ -374,7 +377,7 @@ async def list_servers(self, safe_username: str) -> list[_SessionType]: async def get_server(self, name: str) -> _SessionType | None: """Get a specific jupyter server.""" - url = urljoin(self.url, f"/servers/{name}") + url = urljoin(self.url, f"/{self.url_path_name}/{name}") try: res = await self.client.get(url, timeout=10) except httpx.RequestError as err: @@ -487,10 +490,9 @@ async def delete_server(self, server_name: str, safe_username: str) -> None: """Delete the server.""" server = await self.get_server(server_name, safe_username) if not server: - raise MissingResourceError( - f"Cannot find server {server_name} for user " f"{safe_username} in order to delete it." - ) - return await self.renku_ns_client.delete_server(server_name) + return None + await self.renku_ns_client.delete_server(server_name) + return None async def patch_tokens(self, server_name: str, renku_tokens: RenkuTokens, gitlab_token: GitlabToken) -> None: """Patch the Renku and Gitlab access tokens used in a session.""" diff --git a/components/renku_data_services/notebooks/api/classes/server.py b/components/renku_data_services/notebooks/api/classes/server.py index f28ff1071..147b02e7c 100644 --- a/components/renku_data_services/notebooks/api/classes/server.py +++ b/components/renku_data_services/notebooks/api/classes/server.py @@ -3,7 +3,7 @@ from abc import ABC from collections.abc import Sequence from itertools import chain -from pathlib import Path +from pathlib import PurePosixPath from typing import Any from urllib.parse import urljoin, urlparse @@ -44,8 +44,8 @@ def __init__( user_secrets: K8sUserSecrets | None, cloudstorage: Sequence[ICloudStorageRequest], k8s_client: K8sClient, - workspace_mount_path: Path, - work_dir: Path, + workspace_mount_path: PurePosixPath, + work_dir: PurePosixPath, config: _NotebooksConfig, internal_gitlab_user: APIUser, using_default_image: bool = False, @@ -205,7 +205,7 @@ async def _get_session_manifest(self) -> dict[str, Any]: "pvc": { "enabled": True, "storageClassName": self.config.sessions.storage.pvs_storage_class, - "mountPath": self.workspace_mount_path.absolute().as_posix(), + "mountPath": self.workspace_mount_path.as_posix(), }, } else: @@ -214,7 +214,7 @@ async def _get_session_manifest(self) -> dict[str, Any]: "size": storage_size, "pvc": { "enabled": False, - "mountPath": self.workspace_mount_path.absolute().as_posix(), + "mountPath": self.workspace_mount_path.as_posix(), }, } # Authentication @@ -257,7 +257,7 @@ async def _get_session_manifest(self) -> dict[str, Any]: "jupyterServer": { "defaultUrl": self.server_options.default_url, "image": self.image, - "rootDir": self.work_dir.absolute().as_posix(), + "rootDir": self.work_dir.as_posix(), "resources": self.server_options.to_k8s_resources( enforce_cpu_limits=self.config.sessions.enforce_cpu_limits ), @@ -378,8 +378,8 @@ def __init__( user_secrets: K8sUserSecrets | None, cloudstorage: Sequence[ICloudStorageRequest], k8s_client: K8sClient, - workspace_mount_path: Path, - work_dir: Path, + workspace_mount_path: PurePosixPath, + work_dir: PurePosixPath, config: _NotebooksConfig, gitlab_project: Project | None, internal_gitlab_user: APIUser, @@ -503,8 +503,8 @@ def __init__( user_secrets: K8sUserSecrets | None, cloudstorage: Sequence[ICloudStorageRequest], k8s_client: K8sClient, - workspace_mount_path: Path, - work_dir: Path, + workspace_mount_path: PurePosixPath, + work_dir: PurePosixPath, repositories: list[Repository], config: _NotebooksConfig, internal_gitlab_user: APIUser, diff --git a/components/renku_data_services/notebooks/api/schemas/cloud_storage.py b/components/renku_data_services/notebooks/api/schemas/cloud_storage.py index 05b141c3c..5b848f8fa 100644 --- a/components/renku_data_services/notebooks/api/schemas/cloud_storage.py +++ b/components/renku_data_services/notebooks/api/schemas/cloud_storage.py @@ -2,15 +2,18 @@ from configparser import ConfigParser from io import StringIO -from pathlib import Path -from typing import Any, Optional, Self +from pathlib import PurePosixPath +from typing import Any, Final, Optional, Self +from kubernetes import client from marshmallow import EXCLUDE, Schema, ValidationError, fields, validates_schema from renku_data_services.base_models import APIUser from renku_data_services.notebooks.api.classes.cloud_storage import ICloudStorageRequest from renku_data_services.notebooks.config import _NotebooksConfig +_sanitize_for_serialization = client.ApiClient().sanitize_for_serialization + class RCloneStorageRequest(Schema): """Request for RClone based storage.""" @@ -36,6 +39,8 @@ def validate_storage(self, data: dict, **kwargs: dict) -> None: class RCloneStorage(ICloudStorageRequest): """RClone based storage.""" + pvc_secret_annotation_name: Final[str] = "csi-rclone.dev/secretName" + def __init__( self, source_path: str, @@ -60,7 +65,7 @@ async def storage_from_schema( user: APIUser, internal_gitlab_user: APIUser, project_id: int, - work_dir: Path, + work_dir: PurePosixPath, config: _NotebooksConfig, ) -> Self: """Create storage object from request.""" @@ -92,8 +97,73 @@ async def storage_from_schema( await config.storage_validator.validate_storage_configuration(configuration, source_path) return cls(source_path, configuration, readonly, mount_folder, name, config) + def pvc( + self, + base_name: str, + namespace: str, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, + ) -> client.V1PersistentVolumeClaim: + """The PVC for mounting cloud storage.""" + return client.V1PersistentVolumeClaim( + metadata=client.V1ObjectMeta( + name=base_name, + namespace=namespace, + annotations={self.pvc_secret_annotation_name: base_name} | (annotations or {}), + labels={"name": base_name} | (labels or {}), + ), + spec=client.V1PersistentVolumeClaimSpec( + access_modes=["ReadOnlyMany" if self.readonly else "ReadWriteMany"], + resources=client.V1VolumeResourceRequirements(requests={"storage": "10Gi"}), + storage_class_name=self.config.cloud_storage.storage_class, + ), + ) + + def volume_mount(self, base_name: str) -> client.V1VolumeMount: + """The volume mount for cloud storage.""" + return client.V1VolumeMount( + mount_path=self.mount_folder, + name=base_name, + read_only=self.readonly, + ) + + def volume(self, base_name: str) -> client.V1Volume: + """The volume entry for the statefulset specification.""" + return client.V1Volume( + name=base_name, + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( + claim_name=base_name, read_only=self.readonly + ), + ) + + def secret( + self, + base_name: str, + namespace: str, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, + ) -> client.V1Secret: + """The secret containing the configuration for the rclone csi driver.""" + return client.V1Secret( + metadata=client.V1ObjectMeta( + name=base_name, + namespace=namespace, + annotations=annotations, + labels={"name": base_name} | (labels or {}), + ), + string_data={ + "remote": self.name or base_name, + "remotePath": self.source_path, + "configData": self.config_string(self.name or base_name), + }, + ) + def get_manifest_patch( - self, base_name: str, namespace: str, labels: dict = {}, annotations: dict = {} + self, + base_name: str, + namespace: str, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, ) -> list[dict[str, Any]]: """Get server manifest patch.""" patches = [] @@ -104,57 +174,22 @@ def get_manifest_patch( { "op": "add", "path": f"/{base_name}-pv", - "value": { - "apiVersion": "v1", - "kind": "PersistentVolumeClaim", - "metadata": { - "name": base_name, - "labels": {"name": base_name}, - }, - "spec": { - "accessModes": ["ReadOnlyMany" if self.readonly else "ReadWriteMany"], - "resources": {"requests": {"storage": "10Gi"}}, - "storageClassName": self.config.cloud_storage.storage_class, - }, - }, + "value": _sanitize_for_serialization(self.pvc(base_name, namespace, labels, annotations)), }, { "op": "add", "path": f"/{base_name}-secret", - "value": { - "apiVersion": "v1", - "kind": "Secret", - "metadata": { - "name": base_name, - "labels": {"name": base_name}, - }, - "type": "Opaque", - "stringData": { - "remote": self.name or base_name, - "remotePath": self.source_path, - "configData": self.config_string(self.name or base_name), - }, - }, + "value": _sanitize_for_serialization(self.secret(base_name, namespace, labels, annotations)), }, { "op": "add", "path": "/statefulset/spec/template/spec/containers/0/volumeMounts/-", - "value": { - "mountPath": self.mount_folder, - "name": base_name, - "readOnly": self.readonly, - }, + "value": _sanitize_for_serialization(self.volume_mount(base_name)), }, { "op": "add", "path": "/statefulset/spec/template/spec/volumes/-", - "value": { - "name": base_name, - "persistentVolumeClaim": { - "claimName": base_name, - "readOnly": self.readonly, - }, - }, + "value": _sanitize_for_serialization(self.volume(base_name)), }, ], } diff --git a/components/renku_data_services/notebooks/apispec.py b/components/renku_data_services/notebooks/apispec.py index 9848bf384..331ad10a5 100644 --- a/components/renku_data_services/notebooks/apispec.py +++ b/components/renku_data_services/notebooks/apispec.py @@ -263,8 +263,8 @@ class SessionCloudStoragePost(BaseAPISpec): readonly: bool = True source_path: str target_path: str - storage_id: Optional[str] = Field( - None, + storage_id: str = Field( + ..., description="ULID identifier", max_length=26, min_length=26, diff --git a/components/renku_data_services/notebooks/blueprints.py b/components/renku_data_services/notebooks/blueprints.py index 670d24532..46d965fa4 100644 --- a/components/renku_data_services/notebooks/blueprints.py +++ b/components/renku_data_services/notebooks/blueprints.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from datetime import UTC, datetime from math import floor -from pathlib import Path +from pathlib import PurePosixPath from typing import Any from urllib.parse import urljoin, urlparse @@ -16,7 +16,7 @@ from gitlab.v4.objects.projects import Project as GitlabProject from kubernetes.client import V1ObjectMeta, V1Secret from marshmallow import ValidationError -from sanic import Request, empty, exceptions, json +from sanic import Request, empty, json from sanic.log import logger from sanic.response import HTTPResponse, JSONResponse from sanic_ext import validate @@ -55,6 +55,7 @@ Authentication, AuthenticationType, Culling, + DataSource, ExtraContainer, ExtraVolume, ExtraVolumeMount, @@ -64,7 +65,8 @@ Resources, SecretAsVolume, SecretAsVolumeItem, - SecretRef, + SecretRefKey, + SecretRefWhole, Session, SessionEnvItem, State, @@ -73,7 +75,7 @@ ) from renku_data_services.notebooks.errors.intermittent import AnonymousUserPatchError, PVDisabledError from renku_data_services.notebooks.errors.programming import ProgrammingError -from renku_data_services.notebooks.errors.user import MissingResourceError, UserInputError +from renku_data_services.notebooks.errors.user import MissingResourceError from renku_data_services.notebooks.util.kubernetes_ import ( find_container, renku_1_make_server_name, @@ -83,6 +85,7 @@ from renku_data_services.project.db import ProjectRepository from renku_data_services.repositories.db import GitRepositoriesRepository from renku_data_services.session.db import SessionRepository +from renku_data_services.storage.db import StorageV2Repository @dataclass(kw_only=True) @@ -93,6 +96,7 @@ class NotebooksBP(CustomBlueprint): nb_config: _NotebooksConfig git_repo: GitRepositoriesRepository internal_gitlab_authenticator: base_models.Authenticator + rp_repo: ResourcePoolRepository def version(self) -> BlueprintFactoryResponse: """Return notebook services version.""" @@ -157,7 +161,7 @@ async def _user_server( ) -> JSONResponse: server = await self.nb_config.k8s_client.get_server(server_name, user.id) if server is None: - raise MissingResourceError(message=f"The server {server_name} does not exist.") + raise errors.MissingResourceError(message=f"The server {server_name} does not exist.") server = UserServerManifest(server, self.nb_config.sessions.default_image) return json(NotebookResponse().dump(server)) @@ -346,14 +350,14 @@ async def launch_notebook_helper( if is_image_private and internal_gitlab_user.access_token: image_repo = image_repo.with_oauth2_token(internal_gitlab_user.access_token) if not image_repo.image_exists(parsed_image): - raise MissingResourceError( + raise errors.MissingResourceError( message=( f"Cannot start the session because the following the image {image} does not " "exist or the user does not have the permissions to access it." ) ) else: - raise UserInputError(message="Cannot determine which Docker image to use.") + raise errors.ValidationError(message="Cannot determine which Docker image to use.") parsed_server_options: ServerOptions | None = None if resource_class_id is not None: @@ -381,7 +385,7 @@ async def launch_notebook_helper( # The old style API was used, try to find a matching class from the CRC service parsed_server_options = await nb_config.crc_validator.find_acceptable_class(user, requested_server_options) if parsed_server_options is None: - raise UserInputError( + raise errors.ValidationError( message="Cannot find suitable server options based on your request and " "the available resource classes.", detail="You are receiving this error because you are using the old API for " @@ -393,8 +397,8 @@ async def launch_notebook_helper( default_resource_class = await nb_config.crc_validator.get_default_class() max_storage_gb = default_resource_class.max_storage if storage is not None and storage > max_storage_gb: - raise UserInputError( - "The requested storage amount is higher than the " + raise errors.ValidationError( + message="The requested storage amount is higher than the " f"allowable maximum for the default resource class of {max_storage_gb}GB." ) if storage is None: @@ -409,7 +413,7 @@ async def launch_notebook_helper( if lfs_auto_fetch is not None: parsed_server_options.lfs_auto_fetch = lfs_auto_fetch - image_work_dir = image_repo.image_workdir(parsed_image) or Path("/") + image_work_dir = image_repo.image_workdir(parsed_image) or PurePosixPath("/") mount_path = image_work_dir / "work" server_work_dir = mount_path / gl_project_path @@ -424,20 +428,22 @@ async def launch_notebook_helper( cstorage.model_dump(), user=user, project_id=gl_project_id, - work_dir=server_work_dir.absolute(), + work_dir=server_work_dir, config=nb_config, internal_gitlab_user=internal_gitlab_user, ) ) except ValidationError as e: - raise UserInputError(f"Couldn't load cloud storage config: {str(e)}") + raise errors.ValidationError(message=f"Couldn't load cloud storage config: {str(e)}") mount_points = set(s.mount_folder for s in storages if s.mount_folder and s.mount_folder != "/") if len(mount_points) != len(storages): - raise UserInputError( - "Storage mount points must be set, can't be at the root of the project and must be unique." + raise errors.ValidationError( + message="Storage mount points must be set, can't be at the root of the project and must be unique." ) if any(s1.mount_folder.startswith(s2.mount_folder) for s1 in storages for s2 in storages if s1 != s2): - raise UserInputError("Cannot mount a cloud storage into the mount point of another cloud storage.") + raise errors.ValidationError( + message="Cannot mount a cloud storage into the mount point of another cloud storage." + ) repositories = repositories or [] @@ -475,7 +481,7 @@ async def launch_notebook_helper( ) if len(server.safe_username) > 63: - raise UserInputError( + raise errors.ValidationError( message="A username cannot be longer than 63 characters, " f"your username is {len(server.safe_username)} characters long.", detail="This can occur if your username has been changed manually or by an admin.", @@ -553,7 +559,9 @@ async def _patch_server( state = PatchServerStatusEnum.from_api_state(body.state) if body.state is not None else None resource_class_id = patch_body.resource_class_id if server and not (currently_hibernated or currently_failing) and resource_class_id: - raise UserInputError("The resource class can be changed only if the server is hibernated or failing") + raise errors.ValidationError( + message="The resource class can be changed only if the server is hibernated or failing" + ) if resource_class_id: parsed_server_options = await self.nb_config.crc_validator.validate_class_storage( @@ -700,12 +708,9 @@ def stop_server(self) -> BlueprintFactoryResponse: @authenticate(self.authenticator) async def _stop_server( - request: Request, user: AnonymousAPIUser | AuthenticatedAPIUser, server_name: str + _: Request, user: AnonymousAPIUser | AuthenticatedAPIUser, server_name: str ) -> HTTPResponse: - try: - await self.nb_config.k8s_client.delete_server(server_name, safe_username=user.id) - except MissingResourceError as err: - raise exceptions.NotFound(message=err.message) + await self.nb_config.k8s_client.delete_server(server_name, safe_username=user.id) return HTTPResponse(status=204) return "/notebooks/servers/", ["DELETE"], _stop_server @@ -744,7 +749,7 @@ async def _server_logs( ) return json(ServerLogs().dump(logs)) except MissingResourceError as err: - raise exceptions.NotFound(message=err.message) + raise errors.MissingResourceError(message=err.message) return "/notebooks/logs/", ["GET"], _server_logs @@ -780,6 +785,7 @@ class NotebooksNewBP(CustomBlueprint): project_repo: ProjectRepository session_repo: SessionRepository rp_repo: ResourcePoolRepository + storage_repo: StorageV2Repository def start(self) -> BlueprintFactoryResponse: """Start a session with the new operator.""" @@ -810,7 +816,7 @@ async def _handler( parsed_server_options = await self.nb_config.crc_validator.validate_class_storage( user, resource_class_id, body.disk_storage ) - work_dir = Path("/home/jovyan/work") + work_dir = environment.working_directory user_secrets: K8sUserSecrets | None = None # if body.user_secrets: # user_secrets = K8sUserSecrets( @@ -818,8 +824,45 @@ async def _handler( # user_secret_ids=body.user_secrets.user_secret_ids, # mount_path=body.user_secrets.mount_path, # ) - cloud_storage: list[RCloneStorage] = [] - repositories = [Repository(url=repository) for repository in project.repositories] + cloud_storages_db = await self.storage_repo.get_storage( + user=user, project_id=project.id, include_secrets=True + ) + cloud_storage: dict[str, RCloneStorage] = { + str(s.storage_id): RCloneStorage( + source_path=s.source_path, + mount_folder=(work_dir / s.target_path).as_posix(), + configuration=s.configuration.model_dump(mode="python"), + readonly=s.readonly, + config=self.nb_config, + name=s.name, + ) + for s in cloud_storages_db + } + cloud_storage_request: dict[str, RCloneStorage] = { + s.storage_id: RCloneStorage( + source_path=s.source_path, + mount_folder=(work_dir / s.target_path).as_posix(), + configuration=s.configuration, + readonly=s.readonly, + config=self.nb_config, + name=None, + ) + for s in body.cloudstorage or [] + } + # NOTE: Check the cloud storage in the request body and if any match + # then overwrite the projects cloud storages + # NOTE: Cloud storages in the session launch request body that are not form the DB will cause a 422 error + for csr_id, csr in cloud_storage_request.items(): + if csr_id not in cloud_storage: + raise errors.MissingResourceError( + message=f"You have requested a cloud storage with ID {csr_id} which does not exist " + "or you dont have access to.", + quiet=True, + ) + cloud_storage[csr_id] = csr + # repositories = [Repository(i.url, branch=i.branch, commit_sha=i.commit_sha) for i in body.repositories] + repositories = [Repository(url=i) for i in project.repositories] + secrets_to_create: list[V1Secret] = [] server = Renku2UserServer( user=user, image=image, @@ -829,7 +872,7 @@ async def _handler( server_options=parsed_server_options, environment_variables={}, user_secrets=user_secrets, - cloudstorage=cloud_storage, + cloudstorage=[i for i in cloud_storage.values()], k8s_client=self.nb_config.k8s_v2_client, workspace_mount_path=work_dir, work_dir=work_dir, @@ -839,6 +882,14 @@ async def _handler( is_image_private=False, internal_gitlab_user=internal_gitlab_user, ) + # Generate the cloud storage secrets + data_sources: list[DataSource] = [] + for ics, cs in enumerate(cloud_storage.values()): + secret_name = f"{server_name}-ds-{ics}" + secrets_to_create.append(cs.secret(secret_name, server.k8s_client.preferred_namespace)) + data_sources.append( + DataSource(mountPath=cs.mount_folder, secretRef=SecretRefWhole(name=secret_name, adopt=True)) + ) cert_init, cert_vols = init_containers.certificates_container(self.nb_config) session_init_containers = [InitContainer.model_validate(self.nb_config.k8s_v2_client.sanitize(cert_init))] extra_volumes = [ @@ -874,7 +925,6 @@ async def _handler( metadata=Metadata(name=server_name, annotations=annotations), spec=AmaltheaSessionSpec( codeRepositories=[], - dataSources=[], hibernated=False, session=Session( image=image, @@ -921,13 +971,14 @@ async def _handler( type=AuthenticationType.oauth2proxy if isinstance(user, AuthenticatedAPIUser) else AuthenticationType.token, - secretRef=SecretRef(name=server_name, key="auth", adopt=True), + secretRef=SecretRefKey(name=server_name, key="auth", adopt=True), extraVolumeMounts=[ ExtraVolumeMount(name="renku-authorized-emails", mountPath="/authorized_emails") ] if isinstance(user, AuthenticatedAPIUser) else [], ), + dataSources=data_sources, ), ) parsed_proxy_url = urlparse(urljoin(server.server_url + "/", "oauth2")) @@ -958,12 +1009,14 @@ async def _handler( "verbose": True, } ) - secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data) - secret = await self.nb_config.k8s_v2_client.create_secret(secret) + secrets_to_create.append(V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)) + for s in secrets_to_create: + await self.nb_config.k8s_v2_client.create_secret(s) try: manifest = await self.nb_config.k8s_v2_client.create_server(manifest, user.id) except Exception: - await self.nb_config.k8s_v2_client.delete_secret(secret.metadata.name) + for s in secrets_to_create: + await self.nb_config.k8s_v2_client.delete_secret(s.metadata.name) raise errors.ProgrammingError(message="Could not start the amalthea session") return json(manifest.as_apispec().model_dump(mode="json", exclude_none=True), 201) @@ -1080,6 +1133,6 @@ async def _handler( query: apispec.SessionsSessionIdLogsGetParametersQuery, ) -> HTTPResponse: logs = await self.nb_config.k8s_v2_client.get_server_logs(session_id, user.id, query.max_lines) - return json(apispec.SessionLogsResponse.model_validate(logs).model_dump_json(exclude_none=True)) + return json(apispec.SessionLogsResponse.model_validate(logs).model_dump(exclude_none=True)) return "/sessions//logs", ["GET"], _handler diff --git a/components/renku_data_services/notebooks/cr_amalthea_session.py b/components/renku_data_services/notebooks/cr_amalthea_session.py index 16aa355e0..a4c2e3fd9 100644 --- a/components/renku_data_services/notebooks/cr_amalthea_session.py +++ b/components/renku_data_services/notebooks/cr_amalthea_session.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: -# timestamp: 2024-09-04T22:45:28+00:00 +# timestamp: 2024-09-04T21:22:45+00:00 from __future__ import annotations diff --git a/components/renku_data_services/notebooks/crs.py b/components/renku_data_services/notebooks/crs.py index 206a32f96..76ea08dc3 100644 --- a/components/renku_data_services/notebooks/crs.py +++ b/components/renku_data_services/notebooks/crs.py @@ -32,6 +32,8 @@ from renku_data_services.notebooks.cr_amalthea_session import Model as _ASModel from renku_data_services.notebooks.cr_amalthea_session import Resources3 as Resources from renku_data_services.notebooks.cr_amalthea_session import Secret1 as SecretAsVolume +from renku_data_services.notebooks.cr_amalthea_session import SecretRef as SecretRefKey +from renku_data_services.notebooks.cr_amalthea_session import SecretRef1 as SecretRefWhole from renku_data_services.notebooks.cr_amalthea_session import Spec as AmaltheaSessionSpec from renku_data_services.notebooks.cr_amalthea_session import Type as AuthenticationType from renku_data_services.notebooks.cr_amalthea_session import Type1 as CodeRepositoryType diff --git a/components/renku_data_services/storage/blueprints.py b/components/renku_data_services/storage/blueprints.py index ce72389ff..4ae20b1d1 100644 --- a/components/renku_data_services/storage/blueprints.py +++ b/components/renku_data_services/storage/blueprints.py @@ -46,7 +46,6 @@ async def _get( validator: RCloneValidator, query: apispec.StorageParams, ) -> JSONResponse: - storage: list[models.CloudStorage] storage = await self.storage_repo.get_storage(user=user, project_id=query.project_id) return json([dump_storage_with_sensitive_fields(s, validator) for s in storage]) diff --git a/components/renku_data_services/storage/db.py b/components/renku_data_services/storage/db.py index 7b4e3cda6..4cbfb1788 100644 --- a/components/renku_data_services/storage/db.py +++ b/components/renku_data_services/storage/db.py @@ -49,7 +49,7 @@ async def get_storage( project_id: str | ULID | None = None, name: str | None = None, filter_by_access_level: bool = True, - ) -> list[models.CloudStorage]: + ) -> list[models.SavedCloudStorage]: """Get a storage from the database.""" async with self.session_maker() as session: if not project_id and not name and not id: @@ -60,7 +60,7 @@ async def get_storage( stmt = select(schemas.CloudStorageORM) if project_id is not None: - stmt = stmt.where(schemas.CloudStorageORM.project_id == project_id) + stmt = stmt.where(schemas.CloudStorageORM.project_id == str(project_id)) if id is not None: stmt = stmt.where(schemas.CloudStorageORM.storage_id == id) if name is not None: @@ -78,7 +78,7 @@ async def get_storage( return [s.dump() for s in storage_orms if s.project_id in accessible_projects] - async def get_storage_by_id(self, storage_id: ULID, user: base_models.APIUser) -> models.CloudStorage: + async def get_storage_by_id(self, storage_id: ULID, user: base_models.APIUser) -> models.SavedCloudStorage: """Get a single storage by id.""" storages = await self.get_storage(user, id=str(storage_id), filter_by_access_level=False) @@ -91,7 +91,7 @@ async def get_storage_by_id(self, storage_id: ULID, user: base_models.APIUser) - async def insert_storage( self, storage: models.UnsavedCloudStorage, user: base_models.APIUser - ) -> models.CloudStorage: + ) -> models.SavedCloudStorage: """Insert a new cloud storage entry.""" if not await self.filter_projects_by_access_level(user, [storage.project_id], authz_models.Role.OWNER): raise errors.ForbiddenError(message="User does not have access to this project") @@ -105,7 +105,9 @@ async def insert_storage( session.add(orm) return orm.dump() - async def update_storage(self, storage_id: ULID, user: base_models.APIUser, **kwargs: dict) -> models.CloudStorage: + async def update_storage( + self, storage_id: ULID, user: base_models.APIUser, **kwargs: dict + ) -> models.SavedCloudStorage: """Update a cloud storage entry.""" async with self.session_maker() as session, session.begin(): res = await session.execute( diff --git a/components/renku_data_services/storage/orm.py b/components/renku_data_services/storage/orm.py index cf5fe9106..e9b61fb7f 100644 --- a/components/renku_data_services/storage/orm.py +++ b/components/renku_data_services/storage/orm.py @@ -74,9 +74,9 @@ def load(cls, storage: models.UnsavedCloudStorage) -> "CloudStorageORM": readonly=storage.readonly, ) - def dump(self) -> models.CloudStorage: + def dump(self) -> models.SavedCloudStorage: """Create a cloud storage model from the ORM object.""" - return models.CloudStorage( + return models.SavedCloudStorage( project_id=self.project_id, name=self.name, storage_type=self.storage_type, diff --git a/test/bases/renku_data_services/data_api/test_notebooks.py b/test/bases/renku_data_services/data_api/test_notebooks.py index bfafc3895..e9e973a8a 100644 --- a/test/bases/renku_data_services/data_api/test_notebooks.py +++ b/test/bases/renku_data_services/data_api/test_notebooks.py @@ -183,7 +183,7 @@ async def test_server_options(sanic_client: SanicASGITestClient, user_headers): @pytest.mark.asyncio @pytest.mark.parametrize( - "server_name_fixture,expected_status_code", [("unknown_server_name", 404), ("server_name", 204)] + "server_name_fixture,expected_status_code", [("unknown_server_name", 204), ("server_name", 204)] ) async def test_stop_server( sanic_client: SanicASGITestClient,