diff --git a/HISTORY.rst b/HISTORY.rst index 872b8455..70ad05f9 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -5,6 +5,13 @@ History .. to_doc +--------------------- +0.15.6.dev0 +--------------------- +* Allow tus uploads to Galaxy. + `Pull Request 351`_ + + --------------------- 0.15.5 (2023-09-15) --------------------- diff --git a/pulsar/__init__.py b/pulsar/__init__.py index c5df297c..00423bd2 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.15.5' +__version__ = '0.15.6.dev0' PROJECT_NAME = "pulsar" PROJECT_OWNER = PROJECT_USERAME = "galaxyproject" diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index 2b0cc769..ce2f54ec 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -36,6 +36,9 @@ scp_get_file, scp_post_file, ) +from .transport.tus import ( + tus_upload_file, +) from .util import ( copy_to_path, directory_files, @@ -482,6 +485,34 @@ def write_from_path(self, pulsar_path): post_file(self.url, pulsar_path) +class RemoteTransferTusAction(BaseAction): + """ This action indicates the Pulsar server should transfer the file before + execution via one of the remote transfer implementations. This is like a TransferAction, but + it indicates the action requires network access to the staging server and TUS + will be used for the transfer + """ + inject_url = True + action_type = "remote_transfer_tus" + staging = STAGING_ACTION_REMOTE + + def __init__(self, source, file_lister=None, url=None): + super().__init__(source, file_lister=file_lister) + self.url = url + + def to_dict(self): + return self._extend_base_dict(url=self.url) + + @classmethod + def from_dict(cls, action_dict): + return RemoteTransferAction(source=action_dict["source"], url=action_dict["url"]) + + def write_to_path(self, path): + get_file(self.url, path) + + def write_from_path(self, pulsar_path): + tus_upload_file(self.url, pulsar_path) + + class RemoteObjectStoreCopyAction(BaseAction): """ """ @@ -635,6 +666,7 @@ def write_to_path(self, path): DICTIFIABLE_ACTION_CLASSES = [ RemoteCopyAction, RemoteTransferAction, + RemoteTransferTusAction, MessageAction, RsyncTransferAction, ScpTransferAction, @@ -817,6 +849,7 @@ def unstructured_map(self, path): CopyAction, RemoteCopyAction, RemoteTransferAction, + RemoteTransferTusAction, RemoteObjectStoreCopyAction, RsyncTransferAction, ScpTransferAction, diff --git a/pulsar/client/transport/tus.py b/pulsar/client/transport/tus.py new file mode 100644 index 00000000..6846c81b --- /dev/null +++ b/pulsar/client/transport/tus.py @@ -0,0 +1,50 @@ +import os +import re +from typing import Dict +from urllib.parse import urlparse + +import requests + +try: + from tusclient import client + tus_client_available = True +except ImportError: + tus_client_available = False + +TUS_CLIENT_UNAVAILABLE_MESSAGE = \ + "You are attempting to use the Tus transport with the Pulsar client but tuspy is unavailable." +DEFAULT_PULSAR_TUS_CHUNK_SIZE = 10**7 +PULSAR_TUS_CHUNK_SIZE = int(os.getenv('PULSAR_TUS_CHUNK_SIZE', DEFAULT_PULSAR_TUS_CHUNK_SIZE)) + + +def tus_upload_file(url: str, path: str) -> None: + if not tus_client_available: + raise Exception(TUS_CLIENT_UNAVAILABLE_MESSAGE) + + storage = None + metadata: Dict[str, str] = {} + + headers: Dict[str, str] = {} + tus_url = find_tus_endpoint(url) + my_client = client.TusClient(tus_url, headers=headers) + uploader = my_client.uploader(path, metadata=metadata, url_storage=storage) + uploader.chunk_size = PULSAR_TUS_CHUNK_SIZE + uploader.upload() + upload_session_url = uploader.url + assert upload_session_url + tus_session_id = upload_session_url.rsplit("/", 1)[1] + # job_key and such are encoded in the URL but this route expects a POST body + # and if it has one Galaxy sticks the URL parameters into the "payload" object + # for the controller. So encoded session_id in the POST body - it probably + # all belongs there anyway. + post_response = requests.post(url, data={"session_id": tus_session_id}) + post_response.raise_for_status() + + +def find_tus_endpoint(job_files_endpoint: str) -> str: + parsed = urlparse(job_files_endpoint) + job_files_url_path = parsed.path + tus_endpoint = re.sub(r"jobs/[^/]*/files", "job_files/resumable_upload", job_files_url_path, 1) + + new_url = parsed._replace(path=tus_endpoint) + return new_url.geturl() diff --git a/requirements.txt b/requirements.txt index de27a41c..1db30fa4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,7 @@ paramiko typing-extensions pydantic-tes>=0.1.5 pyjwt +tuspy ## Uncomment if using DRMAA queue manager. #drmaa diff --git a/test/client_transport_test.py b/test/client_transport_test.py index 200a7ace..8faba31e 100644 --- a/test/client_transport_test.py +++ b/test/client_transport_test.py @@ -8,6 +8,7 @@ from pulsar.client.transport.curl import PycurlTransport from pulsar.client.transport.curl import post_file from pulsar.client.transport.curl import get_file +from pulsar.client.transport.tus import find_tus_endpoint from pulsar.client.transport import get_transport from .test_utils import files_server @@ -107,6 +108,12 @@ def test_curl_problems(): assert exception_raised +def test_find_tus_endpoint(): + galaxy_endpoint = "http://subdomain.galaxy.org/prefix/api/jobs/1231sdfsq23e/files?job_key=34" + tus_endpoint = find_tus_endpoint(galaxy_endpoint) + assert tus_endpoint == "http://subdomain.galaxy.org/prefix/api/job_files/resumable_upload?job_key=34" + + def test_get_transport(): assert type(get_transport(None, FakeOsModule("1"))) == PycurlTransport assert type(get_transport(None, FakeOsModule("TRUE"))) == PycurlTransport