Skip to content

Commit

Permalink
Merge pull request #1 from mapk-amazon/desirable-grasshopper
Browse files Browse the repository at this point in the history
Change JobState Retry-Strategy
  • Loading branch information
afgane authored Oct 17, 2024
2 parents 85254bc + 5288774 commit 60722e7
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 26 deletions.
51 changes: 31 additions & 20 deletions lib/galaxy/jobs/runners/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
produce_k8s_job_prefix,
pull_policy,
pykube_client_from_dict,
reload_job,
Service,
service_object_dict,
)
Expand All @@ -53,13 +54,16 @@

__all__ = ("KubernetesJobRunner",)


@dataclass
class RetryableDeleteJob:
k8s_job: Job
retries: int = 5 # Max number of retries
attempts: int = 0 # Current number of attempts

class RetryableDeleteJobState(JobState):
def __init__ (self, job_state, k8s_job, max_retries=5, attempts=0):
self.__dict__ = job_state.__dict__.copy()
self.init_retryable_job(max_retries, attempts)
self.k8s_job = k8s_job

def init_retryable_job(self, max_retries, attempts):
self.max_retries: int = max_retries
self.attempts: int = attempts

class KubernetesJobRunner(AsynchronousJobRunner):
"""
Expand Down Expand Up @@ -839,7 +843,7 @@ def _handle_unschedulable_job(self, k8s_job, job_state):
if self.__has_guest_ports(job_state.job_wrapper):
self.__cleanup_k8s_guest_ports(job_state.job_wrapper, k8s_job)
# Wrap the k8s job before we put it in the work queue so it can be retried a few times
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJob(k8s_job=k8s_job)))
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJobState(job_state=job_state, k8s_job=k8s_job)))
except Exception:
log.exception("Could not clean up an unschedulable k8s batch job. Ignoring...")
return None
Expand Down Expand Up @@ -879,33 +883,40 @@ def _handle_job_failure(self, k8s_job, job_state):
if self.__has_guest_ports(job_state.job_wrapper):
self.__cleanup_k8s_guest_ports(job_state.job_wrapper, k8s_job)
# Wrap the k8s job before we put it in the work queue so it can be retried a few times
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJob(k8s_job=k8s_job)))
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJobState(job_state=job_state, k8s_job=k8s_job)))
except Exception:
log.exception("Could not clean up a failed k8s batch job. Ignoring...")
return mark_failed

def __cleanup_k8s_job(self, retryable_delete_k8s_job: RetryableDeleteJob):
k8s_job = retryable_delete_k8s_job.k8s_job
log.debug(f"Cleaning up job with K8s id {k8s_job.name} (attempt {retryable_delete_k8s_job.attempts + 1}).")
def __cleanup_k8s_job(self, retryable_delete_k8s_job_state: RetryableDeleteJobState):
k8s_job = retryable_delete_k8s_job_state.k8s_job
log.debug(f"Cleaning up job with K8s id {k8s_job.name} (attempt {retryable_delete_k8s_job_state.attempts + 1}).")
k8s_cleanup_job = self.runner_params["k8s_cleanup_job"]
try:
delete_job(k8s_job, k8s_cleanup_job)
except HTTPError as exc:
if retryable_delete_k8s_job.retries < 1:
# If job not found, then previous deletion was successful
if exc.code == 404 and retryable_delete_k8s_job_state.attempts >= 1:
log.warning(
f"Cleanup job with K8s id {k8s_job.name} skipped as it is no longer available (404) and a previous deletion was triggered."
)
return
if retryable_delete_k8s_job_state.max_retries <= retryable_delete_k8s_job_state.attempts:
log.error(
f"Failed to cleanup job with K8s id {k8s_job.name} after {retryable_delete_k8s_job.attempts} attempts; giving up."
f"Failed to cleanup job with K8s id {k8s_job.name} after {retryable_delete_k8s_job_state.attempts} of {retryable_delete_k8s_job_state.max_retries} attempts; giving up."
)
raise exc
else:
# Refresh the job to resolve object & cluster conflicts
k8s_job.reload()
reload_job(k8s_job)
# Try the cleanup again
new_retryable_job = RetryableDeleteJob(
new_retryable_job_state = RetryableDeleteJobState(
job_state=retryable_delete_k8s_job_state,
k8s_job=k8s_job,
retries=retryable_delete_k8s_job.retries - 1,
attempts=retryable_delete_k8s_job.attempts + 1,
max_retries=retryable_delete_k8s_job_state.max_retries,
attempts=retryable_delete_k8s_job_state.attempts + 1,
)
self.work_queue.put((self.__cleanup_k8s_job, new_retryable_job))
self.work_queue.put((self.__cleanup_k8s_job, new_retryable_job_state))

def __cleanup_k8s_ingress(self, ingress, job_failed=False):
k8s_cleanup_job = self.runner_params["k8s_cleanup_job"]
Expand Down Expand Up @@ -1023,7 +1034,7 @@ def stop_job(self, job_wrapper):
log.debug(f"Job {gxy_job.id} ({gxy_job.job_runner_external_id}) has guest ports, cleaning them up")
self.__cleanup_k8s_guest_ports(job_wrapper, k8s_job)
# Wrap the k8s job before we put it in the work queue so it can be retried a few times
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJob(k8s_job=k8s_job)))
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJobState(job_state=JobState(job_wrapper=job_wrapper,job_destination=job_wrapper.job_destination),k8s_job=k8s_job)))
else:
log.debug(f"Could not find job with id {gxy_job.get_job_runner_external_id()} to delete")
# TODO assert whether job parallelism == 0
Expand Down Expand Up @@ -1156,4 +1167,4 @@ def finish_job(self, job_state):
if self.__has_guest_ports(job_state.job_wrapper):
self.__cleanup_k8s_guest_ports(job_state.job_wrapper, k8s_job)
# Wrap the k8s job before we put it in the work queue so it can be retried a few times
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJob(k8s_job=k8s_job)))
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJobState(job_state=job_state, k8s_job=k8s_job)))
21 changes: 19 additions & 2 deletions lib/galaxy/jobs/runners/util/pykube_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

try:
from pykube.config import KubeConfig
from pykube.exceptions import HTTPError
from pykube.exceptions import (
HTTPError,
ObjectDoesNotExist,
)
from pykube.http import HTTPClient
from pykube.objects import (
Ingress,
Expand Down Expand Up @@ -105,7 +108,12 @@ def is_pod_unschedulable(pykube_api, pod, namespace=None):
def delete_job(job, cleanup="always"):
job_failed = job.obj["status"]["failed"] > 0 if "failed" in job.obj["status"] else False
# Scale down the job just in case even if cleanup is never
job.scale(replicas=0)
try:
job.scale(replicas=0)
except ObjectDoesNotExist as e:
# Okay, job does no longer exist
log.info(e)

api_delete = cleanup == "always"
if not api_delete and cleanup == "onsuccess" and not job_failed:
api_delete = True
Expand Down Expand Up @@ -328,3 +336,12 @@ def galaxy_instance_id(params):
"get_volume_mounts_for_job",
"parse_pvc_param_line",
)

def reload_job(job):
try:
job.reload()
except HTTPError as e:
if e.code == 404:
pass
else:
raise e
8 changes: 4 additions & 4 deletions lib/galaxy/objectstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,10 +1065,10 @@ def _size(self, obj, **kwargs) -> int:

def _delete(self, obj, entire_dir: bool = False, **kwargs) -> bool:
"""Override `ObjectStore`'s stub; delete the file or folder on disk."""
path = self._get_filename(obj, **kwargs)
extra_dir = kwargs.get("extra_dir", None)
obj_dir = kwargs.get("obj_dir", False)
try:
path = self._get_filename(obj, **kwargs)
extra_dir = kwargs.get("extra_dir", None)
obj_dir = kwargs.get("obj_dir", False)
if entire_dir and (extra_dir or obj_dir):
shutil.rmtree(path)
return True
Expand Down Expand Up @@ -1111,7 +1111,7 @@ def _get_filename(self, obj, sync_cache: bool = True, **kwargs) -> str:
return path
path = self._construct_path(obj, **kwargs)
if not os.path.exists(path):
raise ObjectNotFound
raise FileNotFoundError
return path

def _update_from_file(
Expand Down

0 comments on commit 60722e7

Please sign in to comment.