Skip to content

Commit

Permalink
WIP: setup local sequential execution
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Nov 11, 2024
1 parent 7a874fc commit d7bb304
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 2 deletions.
5 changes: 4 additions & 1 deletion pulsar/client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
MessageJobClient,
TesMessageCoexecutionJobClient,
TesPollingCoexecutionJobClient,
LocalSequentialClient,
)
from .destination import url_to_destination_params
from .object_client import ObjectStoreClient
Expand Down Expand Up @@ -256,6 +257,8 @@ def get_client(self, destination_params, job_id, **kwargs):
return K8sPollingCoexecutionJobClient(destination_params, job_id, self)
elif destination_params.get("tes_url", False):
return TesPollingCoexecutionJobClient(destination_params, job_id, self)
elif destination_params.get("arc_url", False):
return LocalSequentialClient(destination_params, job_id, self)
else:
raise Exception("Unknown client configuration")

Expand All @@ -268,7 +271,7 @@ def build_client_manager(**kwargs: Dict[str, Any]) -> ClientManagerInterface:
return ClientManager(**kwargs) # TODO: Consider more separation here.
elif kwargs.get('amqp_url', None):
return MessageQueueClientManager(**kwargs)
elif kwargs.get("k8s_enabled") or kwargs.get("tes_url"):
elif kwargs.get("k8s_enabled") or kwargs.get("tes_url") or kwargs.get("arc_enabled"):
return PollingJobClientManager(**kwargs)
else:
return ClientManager(**kwargs)
Expand Down
6 changes: 6 additions & 0 deletions pulsar/scripts/submit_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time

from pulsar.client.util import from_base64_json
from pulsar.client.manager import build_client_manager
from pulsar.main import (
load_pulsar_app,
PulsarManagerConfigBuilder,
Expand All @@ -20,6 +21,7 @@
def add_common_submit_args(arg_parser):
arg_parser.add_argument("--file", default=None)
arg_parser.add_argument("--base64", default=None)
arg_parser.add_argument("--build_client_manager", default=None)
PulsarManagerConfigBuilder.populate_options(arg_parser)


Expand All @@ -32,6 +34,10 @@ def run_server_for_job(args):
submit_job(manager, job_config)
if wait:
log.info("Co-execution job setup, now waiting for job completion and postprocessing.")
if args.build_client_manager:
client_manager = build_client_manager(arc_enabled=True)
client = client_manager.get_client({"arc_url": "http://localhost:8082", "jobs_directory": "/works"}, job_id=job_config["job_id"])
client.launch()
wait_for_job(manager, job_config)
log.info("Leaving finish_execution and shutting down app")
except BaseException:
Expand Down
3 changes: 2 additions & 1 deletion test/test_cli_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def run_and_check_submission(self):
)
base64 = to_base64_json(launch_params)
assert not os.path.exists(galaxy_output)
submit.main(["--base64", base64] + self._encode_application())
submit.main(["--build_client_manager", "true", "--base64", base64] + self._encode_application())
assert os.path.exists(galaxy_output)
out_contents = open(galaxy_output).read()
assert out_contents == "cow file contents\n", out_contents
Expand Down Expand Up @@ -127,5 +127,6 @@ def _encode_application(self):
staging_directory=self.staging_directory,
message_queue_url="memory://submittest",
conda_auto_init=False,
manager={"type": "coexecution"}
)
return ["--app_conf_base64", to_base64_json(app_conf)]

0 comments on commit d7bb304

Please sign in to comment.