Skip to content

Commit

Permalink
fix: correct implementation for multi-arch images
Browse files Browse the repository at this point in the history
  • Loading branch information
olevski committed Dec 9, 2024
1 parent 03262ec commit fd7e4d9
Showing 1 changed file with 66 additions and 16 deletions.
82 changes: 66 additions & 16 deletions renku_notebooks/api/classes/image.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Used to get information about docker images used in jupyter servers."""

import base64
import re
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, Optional, cast
from typing import Any, Optional, Self, cast

import requests
from werkzeug.datastructures import WWWAuthenticate
Expand All @@ -12,11 +14,18 @@


class ManifestTypes(Enum):
"""The mime types for docker image manifests."""

docker_v2: str = "application/vnd.docker.distribution.manifest.v2+json"
docker_v2_list: str = "application/vnd.docker.distribution.manifest.list.v2+json"
oci_v1_manifest: str = "application/vnd.oci.image.manifest.v1+json"
oci_v1_index: str = "application/vnd.oci.image.index.v1+json"


DEFAULT_PLATFORM_ARCHITECTURE = "amd64"
DEFAULT_PLATFORM_OS = "linux"


@dataclass
class ImageRepoDockerAPI:
"""Used to query the docker image repository API.
Expand All @@ -41,16 +50,25 @@ def _get_docker_token(self, image: "Image") -> Optional[str]:
# the request status code and header are not what is expected
return None
www_auth = WWWAuthenticate.from_header(auth_req.headers["Www-Authenticate"])
if not www_auth:
return None
params = {**www_auth.parameters}
realm = params.pop("realm")
if not realm:
return None
headers = {"Accept": "application/json"}
if self.oauth2_token:
creds = base64.urlsafe_b64encode(f"oauth2:{self.oauth2_token}".encode()).decode()
headers["Authorization"] = f"Basic {creds}"
token_req = requests.get(realm, params=params, headers=headers)
return token_req.json().get("token")
return str(token_req.json().get("token"))

def get_image_manifest(self, image: "Image") -> Optional[dict[str, Any]]:
def get_image_manifest(
self,
image: "Image",
platform_architecture: str = DEFAULT_PLATFORM_ARCHITECTURE,
platform_os: str = DEFAULT_PLATFORM_OS,
) -> Optional[dict[str, Any]]:
"""Query the docker API to get the manifest of an image."""
if image.hostname != self.hostname:
raise ImageParseError(
Expand All @@ -68,17 +86,43 @@ def get_image_manifest(self, image: "Image") -> Optional[dict[str, Any]]:
if res.status_code != 200:
headers["Accept"] = ManifestTypes.oci_v1_index.value
res = requests.get(image_digest_url, headers=headers)
if res.status_code == 200:
index_parsed = res.json()
manifest = next(
(man for man in index_parsed.get("manifests", []) if man.get("platform", {}).get("os") == "linux"),
None,
)
manifest = cast(dict[str, Any] | None, manifest)
return manifest
if res.status_code != 200:
return None
return res.json()

content_type = res.headers.get("Content-Type")
if content_type in [ManifestTypes.docker_v2_list.value, ManifestTypes.oci_v1_index.value]:
index_parsed = res.json()

def platform_matches(manifest: dict[str, Any]) -> bool:
platform: dict[str, Any] = manifest.get("platform", {})
return platform.get("architecture") == platform_architecture and platform.get("os") == platform_os

manifest: dict[str, Any] = next(filter(platform_matches, index_parsed.get("manifests", [])), {})
image_digest: str | None = manifest.get("digest")
if not manifest or not image_digest:
return None
image_digest_url = f"https://{image.hostname}/v2/{image.name}/manifests/{image_digest}"
media_type = manifest.get("mediaType")
headers["Accept"] = ManifestTypes.docker_v2.value
if media_type in [
ManifestTypes.docker_v2.value,
ManifestTypes.oci_v1_manifest.value,
]:
headers["Accept"] = media_type
res = requests.get(image_digest_url, headers=headers)
if res.status_code != 200:
headers["Accept"] = ManifestTypes.oci_v1_manifest.value
res = requests.get(image_digest_url, headers=headers)
if res.status_code != 200:
return None

if res.headers.get("Content-Type") not in [
ManifestTypes.docker_v2.value,
ManifestTypes.oci_v1_manifest.value,
]:
return None

return cast(dict[str, Any], res.json())

def image_exists(self, image: "Image") -> bool:
"""Check the docker repo API if the image exists."""
Expand All @@ -102,7 +146,7 @@ def get_image_config(self, image: "Image") -> Optional[dict[str, Any]]:
)
if res.status_code != 200:
return None
return res.json()
return cast(dict[str, Any], res.json())

def image_workdir(self, image: "Image") -> Optional[Path]:
"""Query the docker API to get the workdir of an image."""
Expand All @@ -118,18 +162,23 @@ def image_workdir(self, image: "Image") -> Optional[Path]:
return Path(workdir)

def with_oauth2_token(self, oauth2_token: str) -> "ImageRepoDockerAPI":
"""Return a docker API instance with the token as authentication."""
return ImageRepoDockerAPI(self.hostname, oauth2_token)


@dataclass
class Image:
"""Representation of a docker image."""

hostname: str
name: str
tag: str

@classmethod
def from_path(cls, path: str):
def build_re(*parts):
def from_path(cls, path: str) -> Self:
"""Create an image from a path like 'nginx:1.28'."""

def build_re(*parts: str) -> re.Pattern:
"""Assemble the regex."""
return re.compile(r"^" + r"".join(parts) + r"$")

Expand All @@ -142,7 +191,7 @@ def build_re(*parts):
tag = r"(?P<tag>(?<=:)[a-zA-Z0-9\._\-]{1,}(?=$))"

# a list of tuples with (regex, defaults to fill in case of match)
regexes = [
regexes: list[tuple[re.Pattern, dict[str, str]]] = [
# nginx
(
build_re(docker_image),
Expand Down Expand Up @@ -207,4 +256,5 @@ def build_re(*parts):
raise ImageParseError(f"Cannot parse the image {path}")

def repo_api(self) -> ImageRepoDockerAPI:
"""Get the docker API from the image."""
return ImageRepoDockerAPI(self.hostname)

0 comments on commit fd7e4d9

Please sign in to comment.