Skip to content

Commit

Permalink
Implement tus uploads in Pulsar client.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Jan 11, 2024
1 parent 5d09683 commit 71a88f6
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 0 deletions.
32 changes: 32 additions & 0 deletions pulsar/client/action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
scp_get_file,
scp_post_file,
)
from .transport.tus import (
tus_upload_file,
)
from .util import (
copy_to_path,
directory_files,
Expand Down Expand Up @@ -482,6 +485,34 @@ def write_from_path(self, pulsar_path):
post_file(self.url, pulsar_path)


class RemoteTransferTusAction(BaseAction):
""" This action indicates the Pulsar server should transfer the file before
execution via one of the remote transfer implementations. This is like a TransferAction, but
it indicates the action requires network access to the staging server and TUS
will be used for the transfer
"""
inject_url = True
action_type = "remote_transfer_tus"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister=file_lister)
self.url = url

def to_dict(self):
return self._extend_base_dict(url=self.url)

@classmethod
def from_dict(cls, action_dict):
return RemoteTransferAction(source=action_dict["source"], url=action_dict["url"])

def write_to_path(self, path):
get_file(self.url, path)

def write_from_path(self, pulsar_path):
tus_upload_file(self.url, pulsar_path)


class RemoteObjectStoreCopyAction(BaseAction):
"""
"""
Expand Down Expand Up @@ -817,6 +848,7 @@ def unstructured_map(self, path):
CopyAction,
RemoteCopyAction,
RemoteTransferAction,
RemoteTransferTusAction,
RemoteObjectStoreCopyAction,
RsyncTransferAction,
ScpTransferAction,
Expand Down
49 changes: 49 additions & 0 deletions pulsar/client/transport/tus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
import re
from typing import Dict
from urllib.parse import urlparse

import requests

try:
from tusclient import client
tus_client_available = True
except ImportError:
tus_client_available = False
client = None

TUS_CLIENT_UNAVAILABLE_MESSAGE = \
"You are attempting to use the Tus transport with the Pulsar client but tuspy is unavailable."
DEFAULT_PULSAR_TUS_CHUNK_SIZE = 10**7
PULSAR_TUS_CHUNK_SIZE = int(os.getenv('PULSAR_TUS_CHUNK_SIZE', DEFAULT_PULSAR_TUS_CHUNK_SIZE))


def tus_upload_file(url: str, path: str) -> None:
if not tus_client_available:
raise Exception(TUS_CLIENT_UNAVAILABLE_MESSAGE)

storage = None
metadata: Dict[str, str] = {}

headers: Dict[str, str] = {}
tus_url = find_tus_endpoint(url)
my_client = client.TusClient(tus_url, headers=headers)
uploader = my_client.uploader(path, metadata=metadata, url_storage=storage)
uploader.chunk_size = PULSAR_TUS_CHUNK_SIZE
uploader.upload()
upload_session_url = uploader.url
assert upload_session_url
tus_session_id = upload_session_url.rsplit("/", 1)[1]
url = f"{url}&session_id={tus_session_id}"
post_response = requests.post(tus_url)
post_response.raise_for_status()



def find_tus_endpoint(job_files_endpoint: str) -> str:
parsed = urlparse(job_files_endpoint)
job_files_url_path = parsed.path
tus_endpoint = re.sub(r"jobs/[^/]*/files", "job_files/resumable_upload", job_files_url_path, 1)

new_url = parsed._replace(path=tus_endpoint)
return new_url.geturl()
7 changes: 7 additions & 0 deletions test/client_transport_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pulsar.client.transport.curl import PycurlTransport
from pulsar.client.transport.curl import post_file
from pulsar.client.transport.curl import get_file
from pulsar.client.transport.tus import find_tus_endpoint
from pulsar.client.transport import get_transport

from .test_utils import files_server
Expand Down Expand Up @@ -107,6 +108,12 @@ def test_curl_problems():
assert exception_raised


def test_find_tus_endpoint():
galaxy_endpoint = "http://subdomain.galaxy.org/prefix/api/jobs/1231sdfsq23e/files?job_key=34"
tus_endpoint = find_tus_endpoint(galaxy_endpoint)
assert tus_endpoint == "http://subdomain.galaxy.org/prefix/api/job_files/resumable_upload?job_key=34"


def test_get_transport():
assert type(get_transport(None, FakeOsModule("1"))) == PycurlTransport
assert type(get_transport(None, FakeOsModule("TRUE"))) == PycurlTransport
Expand Down

0 comments on commit 71a88f6

Please sign in to comment.