From f549bf99d881f0cde51b4e2a625321540641c06c Mon Sep 17 00:00:00 2001 From: Cameron Hyde Date: Sun, 16 Jul 2023 13:18:09 +1000 Subject: [PATCH] Skip file transfer when the source file is absent --- pulsar/client/action_mapper.py | 3 +++ pulsar/managers/staging/post.py | 10 +++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index 2b0cc769..6f0c9baf 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -306,6 +306,9 @@ def __init__(self, source, file_lister=None): def path(self): return self.source.get("path") + def path_exists(self): + return exists(self.path) + def unstructured_map(self, path_helper): unstructured_map = self.file_lister.unstructured_map(self.path) if self.staging_needed: diff --git a/pulsar/managers/staging/post.py b/pulsar/managers/staging/post.py index ebe4bc6a..77904125 100644 --- a/pulsar/managers/staging/post.py +++ b/pulsar/managers/staging/post.py @@ -79,7 +79,15 @@ def collect_output(self, results_collector, output_type, action, name): pulsar_path = self.job_directory.calculate_path(name, output_type) description = "staging out file {} via {}".format(pulsar_path, action) - self.action_executor.execute(lambda: action.write_from_path(pulsar_path), description) + + if not action.path_exists(): + log.warn( + f"Output not collectable. File does not exist: {pulsar_path}") + else: + self.action_executor.execute( + lambda: action.write_from_path(pulsar_path), + description, + ) def __pulsar_outputs(job_directory):