From 4cfd95a1e42d05d44b08d11809c60072f447b02e Mon Sep 17 00:00:00 2001 From: Tasko Olevski Date: Wed, 4 Sep 2024 23:39:01 +0200 Subject: [PATCH] feat: add cloud storage via rclone --- bases/renku_data_services/data_api/app.py | 1 + .../api/classes/cloud_storage/__init__.py | 8 +- .../notebooks/api/schemas/cloud_storage.py | 121 +++++++++++------- .../notebooks/blueprints.py | 70 ++++++++-- .../notebooks/cr_amalthea_session.py | 2 +- .../renku_data_services/notebooks/crs.py | 2 + components/renku_data_services/storage/db.py | 2 +- 7 files changed, 144 insertions(+), 62 deletions(-) diff --git a/bases/renku_data_services/data_api/app.py b/bases/renku_data_services/data_api/app.py index aa3cfd116..a51de6989 100644 --- a/bases/renku_data_services/data_api/app.py +++ b/bases/renku_data_services/data_api/app.py @@ -142,6 +142,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic: nb_config=config.nb_config, internal_gitlab_authenticator=config.gitlab_authenticator, git_repo=config.git_repositories_repo, + rp_repo=config.rp_repo, ) notebooks_new = NotebooksNewBP( name="notebooks", diff --git a/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py b/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py index 015653284..a66b2728d 100644 --- a/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py +++ b/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py @@ -6,17 +6,15 @@ class ICloudStorageRequest(Protocol): """The abstract class for cloud storage.""" - exists: bool mount_folder: str - source_folder: str - bucket: str + source_path: str def get_manifest_patch( self, base_name: str, namespace: str, - labels: dict[str, str] = {}, - annotations: dict[str, str] = {}, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, ) -> list[dict[str, Any]]: """The patches applied to a jupyter server to insert the storage in the session.""" ... diff --git a/components/renku_data_services/notebooks/api/schemas/cloud_storage.py b/components/renku_data_services/notebooks/api/schemas/cloud_storage.py index 05b141c3c..5b848f8fa 100644 --- a/components/renku_data_services/notebooks/api/schemas/cloud_storage.py +++ b/components/renku_data_services/notebooks/api/schemas/cloud_storage.py @@ -2,15 +2,18 @@ from configparser import ConfigParser from io import StringIO -from pathlib import Path -from typing import Any, Optional, Self +from pathlib import PurePosixPath +from typing import Any, Final, Optional, Self +from kubernetes import client from marshmallow import EXCLUDE, Schema, ValidationError, fields, validates_schema from renku_data_services.base_models import APIUser from renku_data_services.notebooks.api.classes.cloud_storage import ICloudStorageRequest from renku_data_services.notebooks.config import _NotebooksConfig +_sanitize_for_serialization = client.ApiClient().sanitize_for_serialization + class RCloneStorageRequest(Schema): """Request for RClone based storage.""" @@ -36,6 +39,8 @@ def validate_storage(self, data: dict, **kwargs: dict) -> None: class RCloneStorage(ICloudStorageRequest): """RClone based storage.""" + pvc_secret_annotation_name: Final[str] = "csi-rclone.dev/secretName" + def __init__( self, source_path: str, @@ -60,7 +65,7 @@ async def storage_from_schema( user: APIUser, internal_gitlab_user: APIUser, project_id: int, - work_dir: Path, + work_dir: PurePosixPath, config: _NotebooksConfig, ) -> Self: """Create storage object from request.""" @@ -92,8 +97,73 @@ async def storage_from_schema( await config.storage_validator.validate_storage_configuration(configuration, source_path) return cls(source_path, configuration, readonly, mount_folder, name, config) + def pvc( + self, + base_name: str, + namespace: str, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, + ) -> client.V1PersistentVolumeClaim: + """The PVC for mounting cloud storage.""" + return client.V1PersistentVolumeClaim( + metadata=client.V1ObjectMeta( + name=base_name, + namespace=namespace, + annotations={self.pvc_secret_annotation_name: base_name} | (annotations or {}), + labels={"name": base_name} | (labels or {}), + ), + spec=client.V1PersistentVolumeClaimSpec( + access_modes=["ReadOnlyMany" if self.readonly else "ReadWriteMany"], + resources=client.V1VolumeResourceRequirements(requests={"storage": "10Gi"}), + storage_class_name=self.config.cloud_storage.storage_class, + ), + ) + + def volume_mount(self, base_name: str) -> client.V1VolumeMount: + """The volume mount for cloud storage.""" + return client.V1VolumeMount( + mount_path=self.mount_folder, + name=base_name, + read_only=self.readonly, + ) + + def volume(self, base_name: str) -> client.V1Volume: + """The volume entry for the statefulset specification.""" + return client.V1Volume( + name=base_name, + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( + claim_name=base_name, read_only=self.readonly + ), + ) + + def secret( + self, + base_name: str, + namespace: str, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, + ) -> client.V1Secret: + """The secret containing the configuration for the rclone csi driver.""" + return client.V1Secret( + metadata=client.V1ObjectMeta( + name=base_name, + namespace=namespace, + annotations=annotations, + labels={"name": base_name} | (labels or {}), + ), + string_data={ + "remote": self.name or base_name, + "remotePath": self.source_path, + "configData": self.config_string(self.name or base_name), + }, + ) + def get_manifest_patch( - self, base_name: str, namespace: str, labels: dict = {}, annotations: dict = {} + self, + base_name: str, + namespace: str, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, ) -> list[dict[str, Any]]: """Get server manifest patch.""" patches = [] @@ -104,57 +174,22 @@ def get_manifest_patch( { "op": "add", "path": f"/{base_name}-pv", - "value": { - "apiVersion": "v1", - "kind": "PersistentVolumeClaim", - "metadata": { - "name": base_name, - "labels": {"name": base_name}, - }, - "spec": { - "accessModes": ["ReadOnlyMany" if self.readonly else "ReadWriteMany"], - "resources": {"requests": {"storage": "10Gi"}}, - "storageClassName": self.config.cloud_storage.storage_class, - }, - }, + "value": _sanitize_for_serialization(self.pvc(base_name, namespace, labels, annotations)), }, { "op": "add", "path": f"/{base_name}-secret", - "value": { - "apiVersion": "v1", - "kind": "Secret", - "metadata": { - "name": base_name, - "labels": {"name": base_name}, - }, - "type": "Opaque", - "stringData": { - "remote": self.name or base_name, - "remotePath": self.source_path, - "configData": self.config_string(self.name or base_name), - }, - }, + "value": _sanitize_for_serialization(self.secret(base_name, namespace, labels, annotations)), }, { "op": "add", "path": "/statefulset/spec/template/spec/containers/0/volumeMounts/-", - "value": { - "mountPath": self.mount_folder, - "name": base_name, - "readOnly": self.readonly, - }, + "value": _sanitize_for_serialization(self.volume_mount(base_name)), }, { "op": "add", "path": "/statefulset/spec/template/spec/volumes/-", - "value": { - "name": base_name, - "persistentVolumeClaim": { - "claimName": base_name, - "readOnly": self.readonly, - }, - }, + "value": _sanitize_for_serialization(self.volume(base_name)), }, ], } diff --git a/components/renku_data_services/notebooks/blueprints.py b/components/renku_data_services/notebooks/blueprints.py index bde2d3421..a78e3726f 100644 --- a/components/renku_data_services/notebooks/blueprints.py +++ b/components/renku_data_services/notebooks/blueprints.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from datetime import UTC, datetime from math import floor -from pathlib import Path +from pathlib import PurePosixPath from typing import Any from urllib.parse import urljoin, urlparse @@ -55,6 +55,7 @@ Authentication, AuthenticationType, Culling, + DataSource, ExtraContainer, ExtraVolume, ExtraVolumeMount, @@ -64,7 +65,8 @@ Resources, SecretAsVolume, SecretAsVolumeItem, - SecretRef, + SecretRefKey, + SecretRefWhole, Session, SessionEnvItem, State, @@ -86,6 +88,7 @@ from renku_data_services.project.db import ProjectRepository from renku_data_services.repositories.db import GitRepositoriesRepository from renku_data_services.session.db import SessionRepository +from renku_data_services.storage.db import StorageV2Repository @dataclass(kw_only=True) @@ -415,7 +418,7 @@ async def launch_notebook_helper( if lfs_auto_fetch is not None: parsed_server_options.lfs_auto_fetch = lfs_auto_fetch - image_work_dir = image_repo.image_workdir(parsed_image) or Path("/") + image_work_dir = image_repo.image_workdir(parsed_image) or PurePosixPath("/") mount_path = image_work_dir / "work" server_work_dir = mount_path / gl_project_path @@ -430,7 +433,7 @@ async def launch_notebook_helper( cstorage.model_dump(), user=user, project_id=gl_project_id, - work_dir=server_work_dir.absolute(), + work_dir=server_work_dir, config=nb_config, internal_gitlab_user=internal_gitlab_user, ) @@ -774,6 +777,7 @@ class NotebooksNewBP(CustomBlueprint): project_repo: ProjectRepository session_repo: SessionRepository rp_repo: ResourcePoolRepository + storage_repo: StorageV2Repository def start(self) -> BlueprintFactoryResponse: """Start a session with the new operator.""" @@ -805,7 +809,7 @@ async def _handler( parsed_server_options = await self.nb_config.crc_validator.validate_class_storage( user, resource_class_id, body.disk_storage ) - work_dir = Path("/home/jovyan/work") + work_dir = environment.working_directory user_secrets: K8sUserSecrets | None = None # if body.user_secrets: # user_secrets = K8sUserSecrets( @@ -813,9 +817,41 @@ async def _handler( # user_secret_ids=body.user_secrets.user_secret_ids, # mount_path=body.user_secrets.mount_path, # ) - cloud_storage: list[RCloneStorage] = [] + cloud_storages_db = await self.storage_repo.get_storage( + user=user, project_id=project.id, include_secrets=True + ) + cloud_storage: dict[str, RCloneStorage] = { + str(s.storage_id): RCloneStorage( + source_path=s.source_path, + mount_folder=(work_dir / s.target_path).as_posix(), + configuration=s.configuration.model_dump(mode="python"), + readonly=s.readonly, + config=self.nb_config, + name=s.name, + ) + for s in cloud_storages_db + } + cloud_storage_request: dict[str, RCloneStorage] = { + s.storage_id: RCloneStorage( + source_path=s.source_path, + mount_folder=(work_dir / s.target_path).as_posix(), + configuration=s.configuration, + readonly=s.readonly, + config=self.nb_config, + name=None, + ) + for s in body.cloudstorage or [] + if s.storage_id is not None + } + # NOTE: Check the cloud storage in the request body and if any match + # then overwrite the projects cloud storages + # NOTE: Cloud storages in the session launch request body that are not form the DB are ignored + for csr_id, csr in cloud_storage_request.items(): + if csr_id in cloud_storage: + cloud_storage[csr_id] = csr # repositories = [Repository(i.url, branch=i.branch, commit_sha=i.commit_sha) for i in body.repositories] repositories = [Repository(url=i) for i in project.repositories] + secrets_to_create: list[V1Secret] = [] server = Renku2UserServer( user=user, image=image, @@ -825,7 +861,7 @@ async def _handler( server_options=parsed_server_options, environment_variables={}, user_secrets=user_secrets, - cloudstorage=cloud_storage, + cloudstorage=[i for i in cloud_storage.values()], k8s_client=self.nb_config.k8s_v2_client, workspace_mount_path=work_dir, work_dir=work_dir, @@ -835,6 +871,14 @@ async def _handler( is_image_private=False, internal_gitlab_user=internal_gitlab_user, ) + # Generate the cloud starge secrets + data_sources: list[DataSource] = [] + for ics, cs in enumerate(cloud_storage.values()): + secret_name = f"{server_name}-ds-{ics}" + secrets_to_create.append(cs.secret(secret_name, server.k8s_client.preferred_namespace)) + data_sources.append( + DataSource(mountPath=cs.mount_folder, secretRef=SecretRefWhole(name=secret_name, adopt=True)) + ) cert_init, cert_vols = init_containers.certificates_container(self.nb_config) session_init_containers = [InitContainer.model_validate(self.nb_config.k8s_v2_client.sanitize(cert_init))] extra_volumes = [ExtraVolume.model_validate(self.nb_config.k8s_v2_client.sanitize(i)) for i in cert_vols] @@ -868,7 +912,6 @@ async def _handler( metadata=Metadata(name=server_name, annotations=annotations), spec=AmaltheaSessionSpec( codeRepositories=[], - dataSources=[], hibernated=False, session=Session( image=image, @@ -915,13 +958,14 @@ async def _handler( type=AuthenticationType.oauth2proxy if isinstance(user, AuthenticatedAPIUser) else AuthenticationType.token, - secretRef=SecretRef(name=server_name, key="auth", adopt=True), + secretRef=SecretRefKey(name=server_name, key="auth", adopt=True), extraVolumeMounts=[ ExtraVolumeMount(name="renku-authorized-emails", mountPath="/authorized_emails") ] if isinstance(user, AuthenticatedAPIUser) else [], ), + dataSources=data_sources, ), ) parsed_proxy_url = urlparse(urljoin(server.server_url + "/", "oauth2")) @@ -952,12 +996,14 @@ async def _handler( "verbose": True, } ) - secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data) - secret = await self.nb_config.k8s_v2_client.create_secret(secret) + secrets_to_create.append(V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)) + for s in secrets_to_create: + await self.nb_config.k8s_v2_client.create_secret(s) try: manifest = await self.nb_config.k8s_v2_client.create_server(manifest, user.id) except Exception: - await self.nb_config.k8s_v2_client.delete_secret(secret.metadata.name) + for s in secrets_to_create: + await self.nb_config.k8s_v2_client.delete_secret(s.metadata.name) raise errors.ProgrammingError(message="Could not start the amalthea session") return json(manifest.as_apispec().model_dump(mode="json", exclude_none=True), 201) diff --git a/components/renku_data_services/notebooks/cr_amalthea_session.py b/components/renku_data_services/notebooks/cr_amalthea_session.py index 16aa355e0..a4c2e3fd9 100644 --- a/components/renku_data_services/notebooks/cr_amalthea_session.py +++ b/components/renku_data_services/notebooks/cr_amalthea_session.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: -# timestamp: 2024-09-04T22:45:28+00:00 +# timestamp: 2024-09-04T21:22:45+00:00 from __future__ import annotations diff --git a/components/renku_data_services/notebooks/crs.py b/components/renku_data_services/notebooks/crs.py index 206a32f96..76ea08dc3 100644 --- a/components/renku_data_services/notebooks/crs.py +++ b/components/renku_data_services/notebooks/crs.py @@ -32,6 +32,8 @@ from renku_data_services.notebooks.cr_amalthea_session import Model as _ASModel from renku_data_services.notebooks.cr_amalthea_session import Resources3 as Resources from renku_data_services.notebooks.cr_amalthea_session import Secret1 as SecretAsVolume +from renku_data_services.notebooks.cr_amalthea_session import SecretRef as SecretRefKey +from renku_data_services.notebooks.cr_amalthea_session import SecretRef1 as SecretRefWhole from renku_data_services.notebooks.cr_amalthea_session import Spec as AmaltheaSessionSpec from renku_data_services.notebooks.cr_amalthea_session import Type as AuthenticationType from renku_data_services.notebooks.cr_amalthea_session import Type1 as CodeRepositoryType diff --git a/components/renku_data_services/storage/db.py b/components/renku_data_services/storage/db.py index 583331d5f..9fc4bda63 100644 --- a/components/renku_data_services/storage/db.py +++ b/components/renku_data_services/storage/db.py @@ -67,7 +67,7 @@ async def get_storage( stmt = select(schemas.CloudStorageORM) if project_id is not None: - stmt = stmt.where(schemas.CloudStorageORM.project_id == project_id) + stmt = stmt.where(schemas.CloudStorageORM.project_id == str(project_id)) if id is not None: stmt = stmt.where(schemas.CloudStorageORM.storage_id == id) if name is not None: