Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add SDK methods for individual multipart upload operations #801

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .codegen/__init__.py.tmpl
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the files that has actual handwritten changes.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import databricks.sdk.dbutils as dbutils
from databricks.sdk.credentials_provider import CredentialsStrategy

from databricks.sdk.mixins.files import DbfsExt
from databricks.sdk.mixins.files import FilesExt
from databricks.sdk.mixins.compute import ClustersExt
from databricks.sdk.mixins.workspace import WorkspaceExt
from databricks.sdk.mixins.open_ai_client import ServingEndpointsExt
Expand All @@ -18,7 +19,7 @@ from typing import Optional
"google_credentials" "google_service_account" }}

{{- define "api" -}}
{{- $mixins := dict "ClustersAPI" "ClustersExt" "DbfsAPI" "DbfsExt" "WorkspaceAPI" "WorkspaceExt" "ServingEndpointsAPI" "ServingEndpointsExt" -}}
{{- $mixins := dict "ClustersAPI" "ClustersExt" "DbfsAPI" "DbfsExt" "FilesAPI" "FilesExt" "WorkspaceAPI" "WorkspaceExt" "ServingEndpointsAPI" "ServingEndpointsExt" -}}
{{- $genApi := concat .PascalName "API" -}}
{{- getOrDefault $mixins $genApi $genApi -}}
{{- end -}}
Expand Down
6 changes: 3 additions & 3 deletions databricks/sdk/__init__.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

174 changes: 169 additions & 5 deletions databricks/sdk/mixins/files.py
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the files that has actual handwritten changes.

Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@
import sys
from abc import ABC, abstractmethod
from collections import deque
from enum import Enum
from io import BytesIO
from types import TracebackType
from typing import (TYPE_CHECKING, AnyStr, BinaryIO, Generator, Iterable,
Iterator, Type, Union)
from typing import (TYPE_CHECKING, AnyStr, BinaryIO, Dict, Generator, Iterable,
Iterator, List, Optional, Type, Union)
from urllib import parse

from .._property import _cached_property
from ..errors import NotFound
from ..service import files
from ..service._internal import _escape_multi_segment_path_parameter

if TYPE_CHECKING:
from _typeshed import Self
Expand All @@ -40,9 +42,12 @@ def __init__(self,
self._api = api
self._path = path
if write and read: raise IOError(f'can open either for reading or writing')
if read: self._status = api.get_status(path)
elif write: self._created = api.create(path, overwrite=overwrite)
else: raise IOError(f'need to open either for reading or writing')
if read:
self._status = api.get_status(path)
elif write:
self._created = api.create(path, overwrite=overwrite)
else:
raise IOError(f'need to open either for reading or writing')

def __enter__(self) -> Self:
return self
Expand Down Expand Up @@ -636,3 +641,162 @@ def delete(self, path: str, *, recursive=False):
if p.is_dir and not recursive:
raise IOError('deleting directories requires recursive flag')
p.delete(recursive=recursive)


class FilesExt(files.FilesAPI):
"""Extends the FilesAPI with support for complex multipart upload/download operations & more robust file I/O"""
__doc__ = files.FilesAPI.__doc__

class _FileTransferBackend(Enum):
DB_FILES_API = 1
PRESIGNED_URLS = 2

class _UploadSubOperation(Enum):
UPLOAD_SIMPLE = 1
UPLOAD_MULTIPART_CREATE = 2
UPLOAD_MULTIPART_UPLOAD_PART = 3
UPLOAD_MULTIPART_COMPLETE = 4

# Default backend for performing each upload sub-operation.
_DEFAULT_OPERATIONS = {
_UploadSubOperation.UPLOAD_SIMPLE: _FileTransferBackend.DB_FILES_API,
_UploadSubOperation.UPLOAD_MULTIPART_CREATE: _FileTransferBackend.DB_FILES_API,
_UploadSubOperation.UPLOAD_MULTIPART_UPLOAD_PART: _FileTransferBackend.PRESIGNED_URLS,
_UploadSubOperation.UPLOAD_MULTIPART_COMPLETE: _FileTransferBackend.DB_FILES_API,
}

def __init__(self, api_client):
super().__init__(api_client)
self._api = files.FilesAPI(api_client)

def multipart_upload_create(self,
file_path: str,
*,
file_size_bytes: Optional[int] = None,
backend: Optional[_FileTransferBackend] = None) -> MultipartUploadCreate:
"""Create a multipart upload session, returning the session token and part size."""
if not backend:
backend = self._DEFAULT_OPERATIONS[FilesExt._UploadSubOperation.UPLOAD_MULTIPART_CREATE]

if backend == self._FileTransferBackend.DB_FILES_API:
return self._files_multipart_upload_create(file_path, file_size_bytes=file_size_bytes)
else:
raise NotImplementedError(f"Backend {backend} not yet supported for multipart upload create")

def _files_multipart_upload_create(self,
file_path: str,
*,
file_size_bytes: Optional[int] = None) -> MultipartUploadCreate:
"""Create a multipart upload session."""

headers = {'Accept': 'application/json', 'Content-Type': 'application/json'}

resp = self._api.do(
'POST',
f'/api/2.0/fs/files{_escape_multi_segment_path_parameter(file_path)}?action=initiate-upload{str(file_size_bytes) if file_size_bytes else ""}',
headers=headers)

return MultipartUploadCreate(resp["session_token"], resp["part_size"])

def multipart_upload_complete(self,
file_path: str,
session_token: str,
etags: List[str],
backend: Optional[_FileTransferBackend] = None):
"""Complete a multipart upload session, writing to the path."""
if not backend:
backend = self._DEFAULT_OPERATIONS[FilesExt._UploadSubOperation.UPLOAD_MULTIPART_COMPLETE]

if backend == self._FileTransferBackend.DB_FILES_API:
return self._files_multipart_upload_complete(file_path, session_token, etags)
else:
raise NotImplementedError(f"Backend {backend} not yet supported for multipart upload complete")

def _files_multipart_upload_complete(self, file_path: str, session_token: str, etags: List[str]):
"""Complete a multipart upload session, writing to the path"""

headers = {'Accept': 'application/json', 'Content-Type': 'application/json'}

self._api.do(
'POST',
f'/api/2.0/fs/files{_escape_multi_segment_path_parameter(file_path)}?action=complete-upload&session_token={session_token}',
headers=headers,
body={'etags': etags})
return

def multipart_upload_create_part_urls(self, session_token: str, *, page_token: str,
page_size: int) -> MultipartUploadCreatePartUrlsResponse:
"""Request a set of presigned URLs for uploading parts of a file in a multipart upload session."""

headers = {'Accept': 'application/json', 'Content-Type': 'application/json'}
body = {'session_token': session_token, 'page_token': page_token, 'page_size': page_size}
"""
Expected response of form {
"upload_part_urls": [
{
"method": "PUT",
"url": str,
"headers": [{"name": "Content-Range", "value": "bytes=0-1234/1235"}, ...]
},
...
],
"next_page_token": str
}
"""
resp = self._api.do('POST', f'/api/2.0/fs/create-upload-part-urls', headers=headers, body=body)

presigned_urls = map(lambda u: PresignedUrl(u["method"], u["url"], u["headers"], ["Content-Range"]),
resp["upload_part_urls"])
return MultipartUploadCreatePartUrlsResponse(presigned_urls, resp["next_page_token"])

def execute_presigned_url_request(self,
presigned_url: PresignedUrl,
headers: Optional[Dict[str, str]] = None,
data: Optional[BinaryIO] = None):
"""Execute a request to a presigned URL.

:param headers: Optional[Dict[str, str]]: Additional headers specified by the client
:param data: Optional[BinaryIO]: Data to be sent in the request body, sent as a BinaryIO stream that will be read to its completion
"""
if not presigned_url.all_client_headers_populated(headers.keys()):
raise Exception(
"Not all client-provided headers are populated") # TODO: Move to a dedicated exception type

request_headers = {**presigned_url.headers, **headers}
resp_headers = {}
resp = self._api.do(presigned_url.method,
presigned_url.url,
headers=request_headers,
data=data,
response_headers=resp_headers)
return (resp, resp_headers)


class PresignedUrl:
"""Represents all information needed to execute a presigned URL request"""

def __init__(self, method: str, url: str, headers: List[Dict[str, str]],
headers_populated_by_client: List[str]):
self.method = method
self.url = url
self.headers_populated_by_client = set(headers_populated_by_client)
self.headers = {h["name"]: h["value"] for h in headers}

def all_client_headers_populated(self, user_headers: List[str]):
return self.headers_populated_by_client.issubset(user_headers)


class MultipartUploadCreatePartUrlsResponse:
"""Represents the response of a request for presigned URLs for uploading parts of a file in a multipart upload session."""

def __init__(self, upload_part_urls: List[PresignedUrl], next_page_token: str):
self.upload_part_urls = upload_part_urls
self.next_page_token = next_page_token


class MultipartUploadCreate:
"""Represents the response to an initiated multipart upload session."""

def __init__(self, session_token: str, part_size: int):
self.session_token = session_token
self.part_size = part_size
52 changes: 26 additions & 26 deletions databricks/sdk/service/apps.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading