Skip to content

Commit

Permalink
WIP: further progress with test
Browse files Browse the repository at this point in the history
  • Loading branch information
kysrpex committed Nov 13, 2024
1 parent 7170049 commit bfbb3c3
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 4 deletions.
3 changes: 3 additions & 0 deletions pulsar/client/action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,9 @@ def __init__(self, source, file_lister=None, url=None):
def from_dict(cls, action_dict):
return JsonTransferAction(source=action_dict["source"], url=action_dict["url"])

def to_dict(self):
return self._extend_base_dict(url=self.url)

def write_to_path(self, path):
self._to_path = path

Expand Down
1 change: 1 addition & 0 deletions pulsar/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ def launch(
# 4. stage outputs back using manifest [handled by ARC]
pass


class CoexecutionLaunchMixin(BaseRemoteConfiguredJobClient):
execution_type: ExecutionType
pulsar_container_image: str
Expand Down
7 changes: 7 additions & 0 deletions pulsar/client/staging/up.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ def submit_job(client, client_job_description, job_config=None):
launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources
launch_kwds["token_endpoint"] = client.token_endpoint

# populate `to_path`
for action in file_stager.action_mapper.actions:
name = action.path
input_type = "input"
path = file_stager.job_directory.calculate_path(name, input_type)
action.write_to_path(path)

staging_manifest = file_stager.action_mapper.finalize()
if staging_manifest:
launch_kwds["staging_manifest"] = staging_manifest
Expand Down
85 changes: 85 additions & 0 deletions pulsar/scripts/staging_arc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env python

"""Stage files in or out of a compute environment made available via the Advanced Resource Connector (ARC) [1].
This script reads a set of source and target URL (with `http`, `https` or `file` as URL scheme) and/or path pairs passed
either as command line arguments and/or from a file in the form of a JSON array. It then reads the files from the source
URLs and posts (copies them for `file://` urls) them to the target URLs.
Example usage:
```shell
$ ./staging_arc.py --stage https://example.org file.dat --stage file:///home/user/text.txt https://example.org/files \
--json staging_manifest.json
```
_staging_manifest.json_
```json
[
{
"source": "file:///home/user/data.txt",
"target": "file:///home/person/file.txt"
},
{
"source": "file:///home/user/analysis.txt",
"target": "https://example.org/files/analysis.txt"
}
]
```
Retrieve files from a set of source URLs and save them to a set of target URLs.
References:
- [1] https://www.nordugrid.org/arc/about-arc.html
"""

# When the URL is the target, use POST.

import aiohttp
import json
import sys
from typing import Iterable
from typing import Literal
from dataclasses import dataclass, field
from argparse import ArgumentParser
from typing import Optional


@dataclass
class StagingDeclaration:
"""Declare where to read a file from and where to save it to."""

source: str # a URL
target: str # a URL


...


def parse_json_manifest() -> tuple[StagingDeclaration]:
...


HELP_STAGE = "Read a file from `source` and save it to `target`."
HELP_JSON = "Read a list of `source` and `target` URLs from a JSON file."


def make_parser() -> ArgumentParser:
"""Construct an argument parser used to call the script from the command line."""

module_docstring = sys.modules[__name__].__doc__

parser = ArgumentParser(description=module_docstring)

parser.add_argument(
"--stage", dest="stage", metavar=("source", "target"), nargs=2, action="append", help=HELP_STAGE
)
parser.add_argument("--json", dest="json", nargs=1, action="append", help=HELP_JSON)

return parser


if __name__ == "__main__":
"""Invoke script from the command line."""
argument_parser = make_parser()
args = argument_parser.parse_args(sys.argv[1:])
1 change: 1 addition & 0 deletions pulsar/scripts/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def main(args=None):
add_common_submit_args(arg_parser)
arg_parser.add_argument('--wait', action='store_true')
arg_parser.add_argument('--no_wait', "--no-wait", dest='wait', action='store_false')
arg_parser.add_argument('--build_client_manager', action='store_true')
arg_parser.set_defaults(wait=True)
args = arg_parser.parse_args(args)
run_server_for_job(args)
Expand Down
20 changes: 20 additions & 0 deletions pulsar/scripts/submit_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import logging
import time

from pulsar.client import ClientJobDescription, ClientOutputs, ClientInput
from pulsar.client import submit_job as submit_client_job
from pulsar.client.manager import build_client_manager
from pulsar.client.util import from_base64_json
from pulsar.main import (
load_pulsar_app,
Expand Down Expand Up @@ -32,6 +35,23 @@ def run_server_for_job(args):
submit_job(manager, job_config)
if wait:
log.info("Co-execution job setup, now waiting for job completion and postprocessing.")
if args.build_client_manager:
client_manager = build_client_manager(arc_enabled=True)
client = client_manager.get_client({"arc_url": "http://localhost:8082", "jobs_directory": app.staging_directory}, job_id=job_config["job_id"], default_file_action=job_config["remote_staging"]["action_mapper"]["default_action"], files_endpoint=job_config["remote_staging"]["action_mapper"]["files_endpoint"])
# FIXME: we can probably only test the input staging here, so adjust tests accordingly
client_inputs = [
ClientInput(path=action_source["path"], input_type="input_path")
for action_source in job_config["remote_staging"]["client_inputs"]
]
client_outputs = ClientOutputs.from_dict(job_config["remote_staging"]["client_outputs"])
job_description = ClientJobDescription(
command_line=job_config["command_line"],
working_directory=client_outputs.working_directory,
client_inputs=client_inputs,
client_outputs=client_outputs,
)
job_config["working_directory"] = client_outputs.working_directory
submit_client_job(client, job_description)
wait_for_job(manager, job_config)
log.info("Leaving finish_execution and shutting down app")
except BaseException:
Expand Down
23 changes: 19 additions & 4 deletions test/test_cli_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
temp_directory_persist,
)

from pulsar.client import ClientOutputs
from pulsar.client import ClientOutputs, ClientInput
from pulsar.client.util import to_base64_json
from pulsar.scripts import submit

Expand All @@ -25,19 +25,33 @@ def setup_action_mapper(self, files_endpoint):
def run_and_check_submission(self):
job_id = "0"
galaxy_working = temp_directory_persist()
input_name = "dataset_1.dat"
output_name = "dataset_1211231231231231231.dat"
galaxy_input = os.path.join(galaxy_working, input_name)
with open(galaxy_input, "w") as handle:
handle.write("cow_file_contents\n")
galaxy_output = os.path.join(galaxy_working, output_name)
pulsar_input = os.path.join(self.staging_directory, job_id, "inputs", input_name)
pulsar_output = os.path.join(
self.staging_directory, job_id, "outputs", output_name
)
pulsar_input = os.path.join(self.staging_directory, job_id, "inputs", "cow")
with files_server("/") as test_files_server:
files_endpoint = test_files_server.application_url
action = {
"name": "cow",
"type": "input",
"action": {"action_type": "message", "contents": "cow file contents\n"},
"action": {
"action_type": "json_transfer",
"files_endpoint": files_endpoint,
"path": galaxy_input,
},
}
client_inputs = [
ClientInput(
path=galaxy_input,
input_type="input_path",
).action_source
]
client_outputs = ClientOutputs(
working_directory=galaxy_working,
output_files=[os.path.join(galaxy_working, output_name)],
Expand All @@ -52,12 +66,13 @@ def run_and_check_submission(self):
remote_staging={
"setup": [action],
"action_mapper": self.setup_action_mapper(files_endpoint),
"client_inputs": client_inputs,
"client_outputs": client_outputs.to_dict(),
},
)
base64 = to_base64_json(launch_params)
assert not os.path.exists(galaxy_output)
submit.main(["--build_client_manager", "true", "--base64", base64] + self._encode_application())
submit.main(["--build_client_manager", "--base64", base64] + self._encode_application())
assert os.path.exists(galaxy_output)
out_contents = open(galaxy_output).read()
assert out_contents == "cow file contents\n", out_contents
Expand Down

0 comments on commit bfbb3c3

Please sign in to comment.