Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not attempt to complete pre- or post-process if jobs are cancelled in the middle of either stage #365

Merged
merged 1 commit into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions pulsar/managers/staging/post.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@
log = logging.getLogger(__name__)


def postprocess(job_directory, action_executor):
def postprocess(job_directory, action_executor, was_cancelled):
# Returns True if outputs were collected.
try:
if job_directory.has_metadata("launch_config"):
staging_config = job_directory.load_metadata("launch_config").get("remote_staging", None)
else:
staging_config = None
collected = __collect_outputs(job_directory, staging_config, action_executor)
collected = __collect_outputs(job_directory, staging_config, action_executor, was_cancelled)
return collected
finally:
job_directory.write_file("postprocessed", "")
return False


def __collect_outputs(job_directory, staging_config, action_executor):
def __collect_outputs(job_directory, staging_config, action_executor, was_cancelled):
collected = True
if "action_mapper" in staging_config:
file_action_mapper = action_mapper.FileActionMapper(config=staging_config["action_mapper"])
client_outputs = staging.ClientOutputs.from_dict(staging_config["client_outputs"])
pulsar_outputs = __pulsar_outputs(job_directory)
output_collector = PulsarServerOutputCollector(job_directory, action_executor)
output_collector = PulsarServerOutputCollector(job_directory, action_executor, was_cancelled)
results_collector = ResultsCollector(output_collector, file_action_mapper, client_outputs, pulsar_outputs)
collection_failure_exceptions = results_collector.collect()
if collection_failure_exceptions:
Expand Down Expand Up @@ -62,11 +62,17 @@ def realized_dynamic_file_sources(job_directory):

class PulsarServerOutputCollector:

def __init__(self, job_directory, action_executor):
def __init__(self, job_directory, action_executor, was_cancelled):
self.job_directory = job_directory
self.action_executor = action_executor
self.was_cancelled = was_cancelled

def collect_output(self, results_collector, output_type, action, name):
def action_if_not_cancelled():
if self.was_cancelled():
log.info(f"Skipped output collection '{name}', job is cancelled")
return
action.write_from_path(pulsar_path)
# Not using input path, this is because action knows it path
# in this context.
if action.staging_action_local:
Expand All @@ -79,7 +85,7 @@ def collect_output(self, results_collector, output_type, action, name):

pulsar_path = self.job_directory.calculate_path(name, output_type)
description = "staging out file {} via {}".format(pulsar_path, action)
self.action_executor.execute(lambda: action.write_from_path(pulsar_path), description)
self.action_executor.execute(action_if_not_cancelled, description)


def __pulsar_outputs(job_directory):
Expand Down
5 changes: 4 additions & 1 deletion pulsar/managers/staging/pre.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
log = logging.getLogger(__name__)


def preprocess(job_directory, setup_actions, action_executor, object_store=None):
def preprocess(job_directory, setup_actions, action_executor, was_cancelled, object_store=None):
for setup_action in setup_actions:
if was_cancelled():
log.info("Exiting preprocessing, job is cancelled")
return
name = setup_action["name"]
input_type = setup_action["type"]
action = from_dict(setup_action["action"])
Expand Down
10 changes: 8 additions & 2 deletions pulsar/managers/stateful.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import threading
import time
from functools import partial

try:
# If galaxy-lib or Galaxy 19.05 present.
Expand Down Expand Up @@ -103,6 +104,7 @@ def _launch_prepreprocessing_thread(self, job_id, launch_config):
def do_preprocess():
with self._handling_of_preprocessing_state(job_id, launch_config):
job_directory = self._proxied_manager.job_directory(job_id)
was_cancelled = partial(self._proxied_manager._was_cancelled, job_id)
staging_config = launch_config.get("remote_staging", {})
# TODO: swap out for a generic "job_extra_params"
if 'action_mapper' in staging_config and \
Expand All @@ -111,7 +113,7 @@ def do_preprocess():
for action in staging_config['setup']:
action['action'].update(ssh_key=staging_config['action_mapper']['ssh_key'])
setup_config = staging_config.get("setup", [])
preprocess(job_directory, setup_config, self.__preprocess_action_executor, object_store=self.object_store)
preprocess(job_directory, setup_config, self.__preprocess_action_executor, was_cancelled, object_store=self.object_store)
self.active_jobs.deactivate_job(job_id, active_status=ACTIVE_STATUS_PREPROCESSING)

new_thread_for_job(self, "preprocess", job_id, do_preprocess, daemon=False)
Expand All @@ -121,6 +123,9 @@ def _handling_of_preprocessing_state(self, job_id, launch_config):
job_directory = self._proxied_manager.job_directory(job_id)
try:
yield
if self._proxied_manager._was_cancelled(job_id):
log.info("Exiting job launch, job is cancelled")
return
launch_kwds = {}
if launch_config.get("dependencies_description"):
dependencies_description = DependenciesDescription.from_dict(launch_config["dependencies_description"])
Expand Down Expand Up @@ -219,8 +224,9 @@ def __handle_postprocessing(self, job_id):
def do_postprocess():
postprocess_success = False
job_directory = self._proxied_manager.job_directory(job_id)
was_cancelled = partial(self._proxied_manager._was_cancelled, job_id)
try:
postprocess_success = postprocess(job_directory, self.__postprocess_action_executor)
postprocess_success = postprocess(job_directory, self.__postprocess_action_executor, was_cancelled)
except Exception:
log.exception("Failed to postprocess results for job id %s" % job_id)
final_status = status.COMPLETE if postprocess_success else status.FAILED
Expand Down
Loading