Skip to content

Commit

Permalink
Implement tus uploads in Pulsar client.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Jan 12, 2024
1 parent 5d09683 commit 15c0d66
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 1 deletion.
7 changes: 7 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ History

.. to_doc
---------------------
0.15.6.dev0
---------------------
* Allow tus uploads to Galaxy.
`Pull Request 351`_


---------------------
0.15.5 (2023-09-15)
---------------------
Expand Down
2 changes: 1 addition & 1 deletion pulsar/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '0.15.5'
__version__ = '0.15.6.dev0'

PROJECT_NAME = "pulsar"
PROJECT_OWNER = PROJECT_USERAME = "galaxyproject"
Expand Down
33 changes: 33 additions & 0 deletions pulsar/client/action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
scp_get_file,
scp_post_file,
)
from .transport.tus import (
tus_upload_file,
)
from .util import (
copy_to_path,
directory_files,
Expand Down Expand Up @@ -482,6 +485,34 @@ def write_from_path(self, pulsar_path):
post_file(self.url, pulsar_path)


class RemoteTransferTusAction(BaseAction):
""" This action indicates the Pulsar server should transfer the file before
execution via one of the remote transfer implementations. This is like a TransferAction, but
it indicates the action requires network access to the staging server and TUS
will be used for the transfer
"""
inject_url = True
action_type = "remote_transfer_tus"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister=file_lister)
self.url = url

def to_dict(self):
return self._extend_base_dict(url=self.url)

@classmethod
def from_dict(cls, action_dict):
return RemoteTransferAction(source=action_dict["source"], url=action_dict["url"])

def write_to_path(self, path):
get_file(self.url, path)

def write_from_path(self, pulsar_path):
tus_upload_file(self.url, pulsar_path)


class RemoteObjectStoreCopyAction(BaseAction):
"""
"""
Expand Down Expand Up @@ -635,6 +666,7 @@ def write_to_path(self, path):
DICTIFIABLE_ACTION_CLASSES = [
RemoteCopyAction,
RemoteTransferAction,
RemoteTransferTusAction,
MessageAction,
RsyncTransferAction,
ScpTransferAction,
Expand Down Expand Up @@ -817,6 +849,7 @@ def unstructured_map(self, path):
CopyAction,
RemoteCopyAction,
RemoteTransferAction,
RemoteTransferTusAction,
RemoteObjectStoreCopyAction,
RsyncTransferAction,
ScpTransferAction,
Expand Down
50 changes: 50 additions & 0 deletions pulsar/client/transport/tus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
import re
from typing import Dict
from urllib.parse import urlparse

import requests

try:
from tusclient import client
tus_client_available = True
except ImportError:
tus_client_available = False

TUS_CLIENT_UNAVAILABLE_MESSAGE = \
"You are attempting to use the Tus transport with the Pulsar client but tuspy is unavailable."
DEFAULT_PULSAR_TUS_CHUNK_SIZE = 10**7
PULSAR_TUS_CHUNK_SIZE = int(os.getenv('PULSAR_TUS_CHUNK_SIZE', DEFAULT_PULSAR_TUS_CHUNK_SIZE))


def tus_upload_file(url: str, path: str) -> None:
if not tus_client_available:
raise Exception(TUS_CLIENT_UNAVAILABLE_MESSAGE)

storage = None
metadata: Dict[str, str] = {}

headers: Dict[str, str] = {}
tus_url = find_tus_endpoint(url)
my_client = client.TusClient(tus_url, headers=headers)
uploader = my_client.uploader(path, metadata=metadata, url_storage=storage)
uploader.chunk_size = PULSAR_TUS_CHUNK_SIZE
uploader.upload()
upload_session_url = uploader.url
assert upload_session_url
tus_session_id = upload_session_url.rsplit("/", 1)[1]
# job_key and such are encoded in the URL but this route expects a POST body
# and if it has one Galaxy sticks the URL parameters into the "payload" object
# for the controller. So encoded session_id in the POST body - it probably
# all belongs there anyway.
post_response = requests.post(url, data={"session_id": tus_session_id})
post_response.raise_for_status()


def find_tus_endpoint(job_files_endpoint: str) -> str:
parsed = urlparse(job_files_endpoint)
job_files_url_path = parsed.path
tus_endpoint = re.sub(r"jobs/[^/]*/files", "job_files/resumable_upload", job_files_url_path, 1)

new_url = parsed._replace(path=tus_endpoint)
return new_url.geturl()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ paramiko
typing-extensions
pydantic-tes>=0.1.5
pyjwt
tuspy

## Uncomment if using DRMAA queue manager.
#drmaa
Expand Down
7 changes: 7 additions & 0 deletions test/client_transport_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pulsar.client.transport.curl import PycurlTransport
from pulsar.client.transport.curl import post_file
from pulsar.client.transport.curl import get_file
from pulsar.client.transport.tus import find_tus_endpoint
from pulsar.client.transport import get_transport

from .test_utils import files_server
Expand Down Expand Up @@ -107,6 +108,12 @@ def test_curl_problems():
assert exception_raised


def test_find_tus_endpoint():
galaxy_endpoint = "http://subdomain.galaxy.org/prefix/api/jobs/1231sdfsq23e/files?job_key=34"
tus_endpoint = find_tus_endpoint(galaxy_endpoint)
assert tus_endpoint == "http://subdomain.galaxy.org/prefix/api/job_files/resumable_upload?job_key=34"


def test_get_transport():
assert type(get_transport(None, FakeOsModule("1"))) == PycurlTransport
assert type(get_transport(None, FakeOsModule("TRUE"))) == PycurlTransport
Expand Down

0 comments on commit 15c0d66

Please sign in to comment.