From 81179c24867a8dbc64d38f434caee9de17c6c66c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=BDiga=20Luk=C5=A1i=C4=8D?= <31988337+zigaLuksic@users.noreply.github.com> Date: Wed, 19 Jun 2024 12:04:23 +0200 Subject: [PATCH] Retry 3 times if batch dowload monitoring returns 404 (#350) * try 3 times to monitor * fix mypy issues * switch to decorator, use on _trigger_user_action * fix wrong annotation --- eogrow/pipelines/download_batch.py | 33 +++++++++++++++++++++++++++++- eogrow/tasks/common.py | 2 +- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/eogrow/pipelines/download_batch.py b/eogrow/pipelines/download_batch.py index bcc52cff..4a0bc515 100644 --- a/eogrow/pipelines/download_batch.py +++ b/eogrow/pipelines/download_batch.py @@ -3,11 +3,15 @@ from __future__ import annotations import logging +import time from collections import defaultdict -from typing import Any, List, Literal, Optional +from functools import wraps +from typing import Any, Callable, List, Literal, Optional, TypeVar import fs +import requests from pydantic import Field +from typing_extensions import ParamSpec from sentinelhub import ( BatchRequest, @@ -23,6 +27,7 @@ monitor_batch_analysis, monitor_batch_job, ) +from sentinelhub.exceptions import DownloadFailedException from ..core.area.batch import BatchAreaManager from ..core.pipeline import Pipeline @@ -37,6 +42,31 @@ ) LOGGER = logging.getLogger(__name__) +T = TypeVar("T") +P = ParamSpec("P") + + +def _retry_on_404(func: Callable[P, T]) -> Callable[P, T]: + @wraps(func) + def retrying_func(*args: P.args, **kwargs: P.kwargs) -> T: + for wait_time in [0, 10, 100]: + time.sleep(wait_time) # if we start monitoring too soon we might hit a 404 + try: + return func(*args, **kwargs) + except DownloadFailedException as e: + if ( + e.request_exception is not None + and e.request_exception.response is not None + and e.request_exception.response.status_code == requests.status_codes.codes.NOT_FOUND + ): + LOGGER.info("Received error 404 on monitoring endpoint. Retrying in a while.") + continue # we retry on 404 + raise e + + time.sleep(wait_time) # uses longest wait time from loop + return func(*args, **kwargs) # try one last time and fail explicitly + + return retrying_func class InputDataSchema(BaseSchema): @@ -247,6 +277,7 @@ def _get_evalscript(self) -> str: with self.storage.filesystem.open(evalscript_path) as evalscript_file: return evalscript_file.read() + @_retry_on_404 def _trigger_user_action(self, batch_request: BatchRequest) -> BatchUserAction: """According to status and configuration parameters decide what kind of user action to perform.""" if self.config.analysis_only: diff --git a/eogrow/tasks/common.py b/eogrow/tasks/common.py index 3b5e868e..6deab8fb 100644 --- a/eogrow/tasks/common.py +++ b/eogrow/tasks/common.py @@ -46,7 +46,7 @@ def execute(self, eopatch: EOPatch) -> EOPatch: for label in self.labels: label_mask = np.squeeze((mask == label).astype(np.uint8), axis=-1) - mask_mod = morp_func(label_mask) * label # type: ignore[operator] + mask_mod: np.ndarray = morp_func(label_mask) * label # type: ignore[assignment] mask_mod = mask_mod[..., np.newaxis] mask[mask == label] = mask_mod[mask == label]