Skip to content

Commit

Permalink
Draft of JSON manifest collector
Browse files Browse the repository at this point in the history
This should be the general strategy for collecting input and output
files for ARC, DIRAC, AWS batch etc.
  • Loading branch information
mvdbeek committed Nov 11, 2024
1 parent 198dc8a commit 8d4e7e9
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 1 deletion.
42 changes: 42 additions & 0 deletions pulsar/client/action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from os.path import (
abspath,
basename,
commonpath,
dirname,
exists,
join,
Expand Down Expand Up @@ -190,6 +191,9 @@ def __init__(self, client=None, config=None):
self.ssh_port = config.get("ssh_port", None)
self.mappers = mappers_from_dicts(config.get("paths", []))
self.files_endpoint = config.get("files_endpoint", None)
self.actions = []
# Might want to make the working directory available here so that we know where to place archive
# for archive action

def action(self, source, type, mapper=None):
path = source.get("path", None)
Expand All @@ -202,8 +206,12 @@ def action(self, source, type, mapper=None):
action_kwds = mapper.action_kwds
action = action_class(source, file_lister=file_lister, **action_kwds)
self.__process_action(action, type)
self.actions.append(action)
return action

def finalize(self):
return [_ for _ in (action.finalize() for action in self.actions) if _]

def unstructured_mappers(self):
""" Return mappers that will map 'unstructured' files (i.e. go beyond
mapping inputs, outputs, and config files).
Expand Down Expand Up @@ -342,6 +350,9 @@ def _extend_base_dict(self, **kwds):
base_dict.update(**kwds)
return base_dict

def finalize(self):
pass

def to_dict(self):
return self._extend_base_dict()

Expand Down Expand Up @@ -513,6 +524,35 @@ def write_from_path(self, pulsar_path):
tus_upload_file(self.url, pulsar_path)


class JsonTransferAction(BaseAction):
"""
This action indicates that the pulsar server should create a JSON manifest that can be used to stage files by an
external system that can stage files in and out of the compute environment.
"""
inject_url = True
whole_directory_transfer_supported = True
action_type = "json_transfer"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister)
self.url = url
self._path = None

@classmethod
def from_dict(cls, action_dict):
return JsonTransferAction(source=action_dict["source"], url=action_dict["url"])

def write_to_path(self, path):
self._path = path

def write_from_path(self, pulsar_path: str):
self._path = pulsar_path

def finalize(self):
return {"url": self.url, "path": self.path}


class RemoteObjectStoreCopyAction(BaseAction):
"""
"""
Expand Down Expand Up @@ -664,6 +704,7 @@ def write_to_path(self, path):


DICTIFIABLE_ACTION_CLASSES = [
JsonTransferAction,
RemoteCopyAction,
RemoteTransferAction,
RemoteTransferTusAction,
Expand Down Expand Up @@ -844,6 +885,7 @@ def unstructured_map(self, path):

ACTION_CLASSES: List[Type[BaseAction]] = [
NoneAction,
JsonTransferAction,
RewriteAction,
TransferAction,
CopyAction,
Expand Down
6 changes: 6 additions & 0 deletions pulsar/client/staging/down.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def collect(self):
self.__collect_other_working_directory_files()
self.__collect_metadata_directory_files()
self.__collect_job_directory_files()
# Give actions that require a final action, like those that write a manifest, to write out their content
self.__finalize_action_mapper()
# finalize collection here for executors that need this ?
return self.exception_tracker.collection_failure_exceptions

def __collect_working_directory_outputs(self):
Expand Down Expand Up @@ -134,6 +137,9 @@ def __collect_job_directory_files(self):
'output_jobdir',
)

def __finalize_action_mapper(self):
self.action_mapper.finalize()

def __realized_dynamic_file_source_references(self):
references = {"filename": [], "extra_files": []}

Expand Down
2 changes: 2 additions & 0 deletions pulsar/managers/staging/pre.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ def preprocess(job_directory, setup_actions, action_executor, object_store=None)
action = from_dict(setup_action["action"])
if getattr(action, "inject_object_store", False):
action.object_store = object_store
if getattr(action, "inject_job_directory", False):
action.job_directory = job_directory
path = job_directory.calculate_path(name, input_type)
description = "Staging {} '{}' via {} to {}".format(input_type, name, action, path)
log.debug(description)
Expand Down
12 changes: 12 additions & 0 deletions test/action_mapper_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@
)


def test_action_mapper_finalization():
client = _client("json_transfer")
mapper = FileActionMapper(client)
mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input')
mapper.action({'path': '/the_file'}, 'input')
mapper_summary = mapper.finalize()
assert len(mapper_summary) == 2
assert mapper_summary[0]["path"] == '/opt/galaxy/tools/filters/catWrapper.py'
assert mapper_summary[1]["path"] == '/the_file'



def test_endpoint_validation():
client = _min_client("remote_transfer")
mapper = FileActionMapper(client)
Expand Down
30 changes: 29 additions & 1 deletion test/transfer_action_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,35 @@
import os

from .test_utils import files_server
from pulsar.client.action_mapper import RemoteTransferAction
from pulsar.client.action_mapper import (
JsonTransferAction,
RemoteTransferAction,
)


def test_write_to_path_json():
with files_server() as (server, directory):
from_path = os.path.join(directory, "remote_get")

to_path = os.path.join(directory, "local_get")
url = server.application_url + "?path=%s" % from_path
action = JsonTransferAction({"path": to_path}, url=url)
action.write_to_path(to_path)
assert action.path == to_path
assert action.url == url
assert action.finalize() == {"path": to_path, "url": url}


def test_write_from_file_json():
with files_server() as (server, directory):
from_path = os.path.join(directory, "local_post")
to_path = os.path.join(directory, "remote_post")
url = server.application_url + "?path=%s" % to_path
action = JsonTransferAction({"path": to_path}, url=url)
action.write_from_path(from_path)
assert action.path == to_path
assert action.url == url
assert action.finalize() == {"path": to_path, "url": url}


def test_write_to_file():
Expand Down

0 comments on commit 8d4e7e9

Please sign in to comment.