Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option for pipeline to fail with exception #318

Merged
merged 2 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions eogrow/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
LOGGER = logging.getLogger(__name__)


class PipelineExecutionError(RuntimeError):
"""Raised when the pipeline failed some executions."""


class Pipeline(EOGrowObject):
"""A base class for execution of processing procedures which may or may not include running EOWorkflows, running
EOExecutions, creating maps, etc.
Expand Down Expand Up @@ -212,6 +216,7 @@ def run(self) -> None:
"""The main method for pipeline execution. It sets up logging and runs the pipeline procedure."""
timestamp = current_timestamp()
self.current_execution_name = self.get_pipeline_execution_name(timestamp)
log_folder = self.logging_manager.get_pipeline_logs_folder(self.current_execution_name, full_path=True)

root_logger = logging.getLogger()
handlers = self.logging_manager.start_logging(root_logger, self.current_execution_name, "pipeline.log")
Expand All @@ -231,10 +236,7 @@ def run(self) -> None:
elapsed_time = time.time() - pipeline_start

if failed:
LOGGER.info(
"Pipeline finished with some errors! Check %s",
self.logging_manager.get_pipeline_logs_folder(self.current_execution_name, full_path=True),
)
LOGGER.info("Pipeline finished with some errors! Check %s", log_folder)
else:
LOGGER.info("Pipeline finished successfully!")

Expand All @@ -250,6 +252,9 @@ def run(self) -> None:
self.logging_manager.save_eopatch_execution_status(
pipeline_execution_name=self.current_execution_name, finished=finished, failed=failed
)

if failed and self.config.raise_if_failed:
raise PipelineExecutionError(f"Pipeline failed some executions. Check {log_folder}.")
finally:
self.logging_manager.stop_logging(root_logger, handlers)

Expand Down
3 changes: 3 additions & 0 deletions eogrow/core/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class PipelineSchema(BaseSchema):
raise_on_temporal_mismatch: bool = Field(
False, description="Treat `TemporalDimensionWarning` as an exception during EOExecution."
)
raise_if_failed: bool = Field(
False, description="Raise an exception if `run_procedure` returns some executions in `failed`."
)
debug: bool = Field(False, description="Run pipeline without the `ray` wrapper to enable debugging.")


Expand Down
24 changes: 23 additions & 1 deletion tests/core/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from sentinelhub import CRS, BBox

from eogrow.core.config import interpret_config_from_path
from eogrow.core.pipeline import Pipeline
from eogrow.core.pipeline import Pipeline, PipelineExecutionError


@pytest.fixture(scope="session", name="simple_config_filename")
Expand All @@ -33,6 +33,14 @@ def run_procedure(self) -> Tuple[List[str], List[str]]:
return finished[:-1], finished[-1:] + failed


class FailingPipeline(Pipeline):
class Schema(Pipeline.Schema):
fail: bool

def run_procedure(self) -> Tuple[List[str], List[str]]:
return [], ([0] if self.config.fail else [])


def test_pipeline_execution(simple_config_filename: str) -> None:
"""Tests that appropriate folders and log files are created."""
config = interpret_config_from_path(simple_config_filename)
Expand Down Expand Up @@ -88,3 +96,17 @@ def test_get_patch_list_filtration_error(test_subset: List[Union[int, str]], sim
pipeline = SimplePipeline.from_raw_config(config)
with pytest.raises(ValueError):
pipeline.get_patch_list()


@pytest.mark.parametrize("fail", [True, False])
@pytest.mark.parametrize("raise_if_failed", [True, False])
def test_pipeline_raises_on_failure(fail: bool, raise_if_failed: bool, simple_config_filename: str):
config = interpret_config_from_path(simple_config_filename)
config.pop("test_param")
pipeline = FailingPipeline.from_raw_config({"fail": fail, "raise_if_failed": raise_if_failed, **config})

if fail and raise_if_failed:
with pytest.raises(PipelineExecutionError):
pipeline.run()
else:
pipeline.run()