Skip to content

Commit

Permalink
feat: add cloud storage via rclone
Browse files Browse the repository at this point in the history
  • Loading branch information
olevski committed Sep 6, 2024
1 parent 4685981 commit 4cfd95a
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 62 deletions.
1 change: 1 addition & 0 deletions bases/renku_data_services/data_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
nb_config=config.nb_config,
internal_gitlab_authenticator=config.gitlab_authenticator,
git_repo=config.git_repositories_repo,
rp_repo=config.rp_repo,
)
notebooks_new = NotebooksNewBP(
name="notebooks",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@
class ICloudStorageRequest(Protocol):
"""The abstract class for cloud storage."""

exists: bool
mount_folder: str
source_folder: str
bucket: str
source_path: str

def get_manifest_patch(
self,
base_name: str,
namespace: str,
labels: dict[str, str] = {},
annotations: dict[str, str] = {},
labels: dict[str, str] | None = None,
annotations: dict[str, str] | None = None,
) -> list[dict[str, Any]]:
"""The patches applied to a jupyter server to insert the storage in the session."""
...
121 changes: 78 additions & 43 deletions components/renku_data_services/notebooks/api/schemas/cloud_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

from configparser import ConfigParser
from io import StringIO
from pathlib import Path
from typing import Any, Optional, Self
from pathlib import PurePosixPath
from typing import Any, Final, Optional, Self

from kubernetes import client
from marshmallow import EXCLUDE, Schema, ValidationError, fields, validates_schema

from renku_data_services.base_models import APIUser
from renku_data_services.notebooks.api.classes.cloud_storage import ICloudStorageRequest
from renku_data_services.notebooks.config import _NotebooksConfig

_sanitize_for_serialization = client.ApiClient().sanitize_for_serialization


class RCloneStorageRequest(Schema):
"""Request for RClone based storage."""
Expand All @@ -36,6 +39,8 @@ def validate_storage(self, data: dict, **kwargs: dict) -> None:
class RCloneStorage(ICloudStorageRequest):
"""RClone based storage."""

pvc_secret_annotation_name: Final[str] = "csi-rclone.dev/secretName"

def __init__(
self,
source_path: str,
Expand All @@ -60,7 +65,7 @@ async def storage_from_schema(
user: APIUser,
internal_gitlab_user: APIUser,
project_id: int,
work_dir: Path,
work_dir: PurePosixPath,
config: _NotebooksConfig,
) -> Self:
"""Create storage object from request."""
Expand Down Expand Up @@ -92,8 +97,73 @@ async def storage_from_schema(
await config.storage_validator.validate_storage_configuration(configuration, source_path)
return cls(source_path, configuration, readonly, mount_folder, name, config)

def pvc(
self,
base_name: str,
namespace: str,
labels: dict[str, str] | None = None,
annotations: dict[str, str] | None = None,
) -> client.V1PersistentVolumeClaim:
"""The PVC for mounting cloud storage."""
return client.V1PersistentVolumeClaim(
metadata=client.V1ObjectMeta(
name=base_name,
namespace=namespace,
annotations={self.pvc_secret_annotation_name: base_name} | (annotations or {}),
labels={"name": base_name} | (labels or {}),
),
spec=client.V1PersistentVolumeClaimSpec(
access_modes=["ReadOnlyMany" if self.readonly else "ReadWriteMany"],
resources=client.V1VolumeResourceRequirements(requests={"storage": "10Gi"}),
storage_class_name=self.config.cloud_storage.storage_class,
),
)

def volume_mount(self, base_name: str) -> client.V1VolumeMount:
"""The volume mount for cloud storage."""
return client.V1VolumeMount(
mount_path=self.mount_folder,
name=base_name,
read_only=self.readonly,
)

def volume(self, base_name: str) -> client.V1Volume:
"""The volume entry for the statefulset specification."""
return client.V1Volume(
name=base_name,
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
claim_name=base_name, read_only=self.readonly
),
)

def secret(
self,
base_name: str,
namespace: str,
labels: dict[str, str] | None = None,
annotations: dict[str, str] | None = None,
) -> client.V1Secret:
"""The secret containing the configuration for the rclone csi driver."""
return client.V1Secret(
metadata=client.V1ObjectMeta(
name=base_name,
namespace=namespace,
annotations=annotations,
labels={"name": base_name} | (labels or {}),
),
string_data={
"remote": self.name or base_name,
"remotePath": self.source_path,
"configData": self.config_string(self.name or base_name),
},
)

def get_manifest_patch(
self, base_name: str, namespace: str, labels: dict = {}, annotations: dict = {}
self,
base_name: str,
namespace: str,
labels: dict[str, str] | None = None,
annotations: dict[str, str] | None = None,
) -> list[dict[str, Any]]:
"""Get server manifest patch."""
patches = []
Expand All @@ -104,57 +174,22 @@ def get_manifest_patch(
{
"op": "add",
"path": f"/{base_name}-pv",
"value": {
"apiVersion": "v1",
"kind": "PersistentVolumeClaim",
"metadata": {
"name": base_name,
"labels": {"name": base_name},
},
"spec": {
"accessModes": ["ReadOnlyMany" if self.readonly else "ReadWriteMany"],
"resources": {"requests": {"storage": "10Gi"}},
"storageClassName": self.config.cloud_storage.storage_class,
},
},
"value": _sanitize_for_serialization(self.pvc(base_name, namespace, labels, annotations)),
},
{
"op": "add",
"path": f"/{base_name}-secret",
"value": {
"apiVersion": "v1",
"kind": "Secret",
"metadata": {
"name": base_name,
"labels": {"name": base_name},
},
"type": "Opaque",
"stringData": {
"remote": self.name or base_name,
"remotePath": self.source_path,
"configData": self.config_string(self.name or base_name),
},
},
"value": _sanitize_for_serialization(self.secret(base_name, namespace, labels, annotations)),
},
{
"op": "add",
"path": "/statefulset/spec/template/spec/containers/0/volumeMounts/-",
"value": {
"mountPath": self.mount_folder,
"name": base_name,
"readOnly": self.readonly,
},
"value": _sanitize_for_serialization(self.volume_mount(base_name)),
},
{
"op": "add",
"path": "/statefulset/spec/template/spec/volumes/-",
"value": {
"name": base_name,
"persistentVolumeClaim": {
"claimName": base_name,
"readOnly": self.readonly,
},
},
"value": _sanitize_for_serialization(self.volume(base_name)),
},
],
}
Expand Down
70 changes: 58 additions & 12 deletions components/renku_data_services/notebooks/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dataclasses import dataclass
from datetime import UTC, datetime
from math import floor
from pathlib import Path
from pathlib import PurePosixPath
from typing import Any
from urllib.parse import urljoin, urlparse

Expand Down Expand Up @@ -55,6 +55,7 @@
Authentication,
AuthenticationType,
Culling,
DataSource,
ExtraContainer,
ExtraVolume,
ExtraVolumeMount,
Expand All @@ -64,7 +65,8 @@
Resources,
SecretAsVolume,
SecretAsVolumeItem,
SecretRef,
SecretRefKey,
SecretRefWhole,
Session,
SessionEnvItem,
State,
Expand All @@ -86,6 +88,7 @@
from renku_data_services.project.db import ProjectRepository
from renku_data_services.repositories.db import GitRepositoriesRepository
from renku_data_services.session.db import SessionRepository
from renku_data_services.storage.db import StorageV2Repository


@dataclass(kw_only=True)
Expand Down Expand Up @@ -415,7 +418,7 @@ async def launch_notebook_helper(
if lfs_auto_fetch is not None:
parsed_server_options.lfs_auto_fetch = lfs_auto_fetch

image_work_dir = image_repo.image_workdir(parsed_image) or Path("/")
image_work_dir = image_repo.image_workdir(parsed_image) or PurePosixPath("/")
mount_path = image_work_dir / "work"

server_work_dir = mount_path / gl_project_path
Expand All @@ -430,7 +433,7 @@ async def launch_notebook_helper(
cstorage.model_dump(),
user=user,
project_id=gl_project_id,
work_dir=server_work_dir.absolute(),
work_dir=server_work_dir,
config=nb_config,
internal_gitlab_user=internal_gitlab_user,
)
Expand Down Expand Up @@ -774,6 +777,7 @@ class NotebooksNewBP(CustomBlueprint):
project_repo: ProjectRepository
session_repo: SessionRepository
rp_repo: ResourcePoolRepository
storage_repo: StorageV2Repository

def start(self) -> BlueprintFactoryResponse:
"""Start a session with the new operator."""
Expand Down Expand Up @@ -805,17 +809,49 @@ async def _handler(
parsed_server_options = await self.nb_config.crc_validator.validate_class_storage(
user, resource_class_id, body.disk_storage
)
work_dir = Path("/home/jovyan/work")
work_dir = environment.working_directory
user_secrets: K8sUserSecrets | None = None
# if body.user_secrets:
# user_secrets = K8sUserSecrets(
# name=server_name,
# user_secret_ids=body.user_secrets.user_secret_ids,
# mount_path=body.user_secrets.mount_path,
# )
cloud_storage: list[RCloneStorage] = []
cloud_storages_db = await self.storage_repo.get_storage(
user=user, project_id=project.id, include_secrets=True
)
cloud_storage: dict[str, RCloneStorage] = {
str(s.storage_id): RCloneStorage(
source_path=s.source_path,
mount_folder=(work_dir / s.target_path).as_posix(),
configuration=s.configuration.model_dump(mode="python"),
readonly=s.readonly,
config=self.nb_config,
name=s.name,
)
for s in cloud_storages_db
}
cloud_storage_request: dict[str, RCloneStorage] = {
s.storage_id: RCloneStorage(
source_path=s.source_path,
mount_folder=(work_dir / s.target_path).as_posix(),
configuration=s.configuration,
readonly=s.readonly,
config=self.nb_config,
name=None,
)
for s in body.cloudstorage or []
if s.storage_id is not None
}
# NOTE: Check the cloud storage in the request body and if any match
# then overwrite the projects cloud storages
# NOTE: Cloud storages in the session launch request body that are not form the DB are ignored
for csr_id, csr in cloud_storage_request.items():
if csr_id in cloud_storage:
cloud_storage[csr_id] = csr
# repositories = [Repository(i.url, branch=i.branch, commit_sha=i.commit_sha) for i in body.repositories]
repositories = [Repository(url=i) for i in project.repositories]
secrets_to_create: list[V1Secret] = []
server = Renku2UserServer(
user=user,
image=image,
Expand All @@ -825,7 +861,7 @@ async def _handler(
server_options=parsed_server_options,
environment_variables={},
user_secrets=user_secrets,
cloudstorage=cloud_storage,
cloudstorage=[i for i in cloud_storage.values()],
k8s_client=self.nb_config.k8s_v2_client,
workspace_mount_path=work_dir,
work_dir=work_dir,
Expand All @@ -835,6 +871,14 @@ async def _handler(
is_image_private=False,
internal_gitlab_user=internal_gitlab_user,
)
# Generate the cloud starge secrets
data_sources: list[DataSource] = []
for ics, cs in enumerate(cloud_storage.values()):
secret_name = f"{server_name}-ds-{ics}"
secrets_to_create.append(cs.secret(secret_name, server.k8s_client.preferred_namespace))
data_sources.append(
DataSource(mountPath=cs.mount_folder, secretRef=SecretRefWhole(name=secret_name, adopt=True))
)
cert_init, cert_vols = init_containers.certificates_container(self.nb_config)
session_init_containers = [InitContainer.model_validate(self.nb_config.k8s_v2_client.sanitize(cert_init))]
extra_volumes = [ExtraVolume.model_validate(self.nb_config.k8s_v2_client.sanitize(i)) for i in cert_vols]
Expand Down Expand Up @@ -868,7 +912,6 @@ async def _handler(
metadata=Metadata(name=server_name, annotations=annotations),
spec=AmaltheaSessionSpec(
codeRepositories=[],
dataSources=[],
hibernated=False,
session=Session(
image=image,
Expand Down Expand Up @@ -915,13 +958,14 @@ async def _handler(
type=AuthenticationType.oauth2proxy
if isinstance(user, AuthenticatedAPIUser)
else AuthenticationType.token,
secretRef=SecretRef(name=server_name, key="auth", adopt=True),
secretRef=SecretRefKey(name=server_name, key="auth", adopt=True),
extraVolumeMounts=[
ExtraVolumeMount(name="renku-authorized-emails", mountPath="/authorized_emails")
]
if isinstance(user, AuthenticatedAPIUser)
else [],
),
dataSources=data_sources,
),
)
parsed_proxy_url = urlparse(urljoin(server.server_url + "/", "oauth2"))
Expand Down Expand Up @@ -952,12 +996,14 @@ async def _handler(
"verbose": True,
}
)
secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)
secret = await self.nb_config.k8s_v2_client.create_secret(secret)
secrets_to_create.append(V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data))
for s in secrets_to_create:
await self.nb_config.k8s_v2_client.create_secret(s)
try:
manifest = await self.nb_config.k8s_v2_client.create_server(manifest, user.id)
except Exception:
await self.nb_config.k8s_v2_client.delete_secret(secret.metadata.name)
for s in secrets_to_create:
await self.nb_config.k8s_v2_client.delete_secret(s.metadata.name)
raise errors.ProgrammingError(message="Could not start the amalthea session")

return json(manifest.as_apispec().model_dump(mode="json", exclude_none=True), 201)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: <stdin>
# timestamp: 2024-09-04T22:45:28+00:00
# timestamp: 2024-09-04T21:22:45+00:00

from __future__ import annotations

Expand Down
Loading

0 comments on commit 4cfd95a

Please sign in to comment.