Skip to content

Commit

Permalink
Add option for pipeline to fail with exception (#318)
Browse files Browse the repository at this point in the history
* add option for pipeline to fail with exception

* fix pytest fixture names
  • Loading branch information
zigaLuksic authored Jan 3, 2024
1 parent 0ed0a51 commit ce6e141
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 5 deletions.
13 changes: 9 additions & 4 deletions eogrow/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
LOGGER = logging.getLogger(__name__)


class PipelineExecutionError(RuntimeError):
"""Raised when the pipeline failed some executions."""


class Pipeline(EOGrowObject):
"""A base class for execution of processing procedures which may or may not include running EOWorkflows, running
EOExecutions, creating maps, etc.
Expand Down Expand Up @@ -212,6 +216,7 @@ def run(self) -> None:
"""The main method for pipeline execution. It sets up logging and runs the pipeline procedure."""
timestamp = current_timestamp()
self.current_execution_name = self.get_pipeline_execution_name(timestamp)
log_folder = self.logging_manager.get_pipeline_logs_folder(self.current_execution_name, full_path=True)

root_logger = logging.getLogger()
handlers = self.logging_manager.start_logging(root_logger, self.current_execution_name, "pipeline.log")
Expand All @@ -231,10 +236,7 @@ def run(self) -> None:
elapsed_time = time.time() - pipeline_start

if failed:
LOGGER.info(
"Pipeline finished with some errors! Check %s",
self.logging_manager.get_pipeline_logs_folder(self.current_execution_name, full_path=True),
)
LOGGER.info("Pipeline finished with some errors! Check %s", log_folder)
else:
LOGGER.info("Pipeline finished successfully!")

Expand All @@ -250,6 +252,9 @@ def run(self) -> None:
self.logging_manager.save_eopatch_execution_status(
pipeline_execution_name=self.current_execution_name, finished=finished, failed=failed
)

if failed and self.config.raise_if_failed:
raise PipelineExecutionError(f"Pipeline failed some executions. Check {log_folder}.")
finally:
self.logging_manager.stop_logging(root_logger, handlers)

Expand Down
3 changes: 3 additions & 0 deletions eogrow/core/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class PipelineSchema(BaseSchema):
raise_on_temporal_mismatch: bool = Field(
False, description="Treat `TemporalDimensionWarning` as an exception during EOExecution."
)
raise_if_failed: bool = Field(
False, description="Raise an exception if `run_procedure` returns some executions in `failed`."
)
debug: bool = Field(False, description="Run pipeline without the `ray` wrapper to enable debugging.")


Expand Down
24 changes: 23 additions & 1 deletion tests/core/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from sentinelhub import CRS, BBox

from eogrow.core.config import interpret_config_from_path
from eogrow.core.pipeline import Pipeline
from eogrow.core.pipeline import Pipeline, PipelineExecutionError


@pytest.fixture(scope="session", name="simple_config_filename")
Expand All @@ -33,6 +33,14 @@ def run_procedure(self) -> Tuple[List[str], List[str]]:
return finished[:-1], finished[-1:] + failed


class FailingPipeline(Pipeline):
class Schema(Pipeline.Schema):
fail: bool

def run_procedure(self) -> Tuple[List[str], List[str]]:
return [], ([0] if self.config.fail else [])


def test_pipeline_execution(simple_config_filename: str) -> None:
"""Tests that appropriate folders and log files are created."""
config = interpret_config_from_path(simple_config_filename)
Expand Down Expand Up @@ -88,3 +96,17 @@ def test_get_patch_list_filtration_error(test_subset: List[Union[int, str]], sim
pipeline = SimplePipeline.from_raw_config(config)
with pytest.raises(ValueError):
pipeline.get_patch_list()


@pytest.mark.parametrize("fail", [True, False])
@pytest.mark.parametrize("raise_if_failed", [True, False])
def test_pipeline_raises_on_failure(fail: bool, raise_if_failed: bool, simple_config_filename: str):
config = interpret_config_from_path(simple_config_filename)
config.pop("test_param")
pipeline = FailingPipeline.from_raw_config({"fail": fail, "raise_if_failed": raise_if_failed, **config})

if fail and raise_if_failed:
with pytest.raises(PipelineExecutionError):
pipeline.run()
else:
pipeline.run()

0 comments on commit ce6e141

Please sign in to comment.