Skip to content

Commit

Permalink
Added method to fetch status directly from k8s to be used at strategi…
Browse files Browse the repository at this point in the history
…c events
  • Loading branch information
alfredeen committed Aug 23, 2024
1 parent af3d0d3 commit 20c8d09
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 8 deletions.
15 changes: 9 additions & 6 deletions serve_event_listener/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ def setup(self, **kwargs: Optional[Any]) -> None:
"\n\n\t{}\n\t Running Setup Process \n\t{}\n".format("#" * 30, "#" * 30)
)
try:
self.check_status()
self.check_serve_api_status()
self.setup_client()
self.token = self.fetch_token()
self._status_data = StatusData()
self._status_data.set_k8s_api_client(self.client, self.namespace)
self._status_queue = StatusQueue(self.post, self.token)
self.setup_complete = True
except Exception as e:
Expand Down Expand Up @@ -149,7 +150,7 @@ def listen(self) -> None:
else:
logger.warning("Setup not completed - run .setup() first")

def check_status(self) -> bool:
def check_serve_api_status(self) -> bool:
"""
Checks the status of the Serve API.
Expand Down Expand Up @@ -193,17 +194,19 @@ def setup_client(self) -> None:
self.watch = watch.Watch()

def list_all_pods(self):
logger.info("Listing all pods and status codes")
logger.info("Listing all pods and their status codes")

try:
api_response = self.client.list_namespaced_pod(
self.namespace, limit=500, timeout_seconds=120, watch=False
)

for pod in api_response.items:
# TODO get_status() similar to logic in StatusData
# Make this a helper function and unit test?
print(f"{pod.metadata.name} with status {pod.status}")
release = pod.metadata.labels.get("release")
app_status = StatusData.determine_status_from_k8s(pod.status)
logger.info(
f"Release={release}, {pod.metadata.name} with status {app_status}"
)
except ApiException as e:
logger.warning(
f"Exception when calling CoreV1Api->list_namespaced_pod. {e}"
Expand Down
69 changes: 67 additions & 2 deletions serve_event_listener/status_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import Any, Dict, Tuple, Union

import requests
from kubernetes import client
from kubernetes.client.exceptions import ApiException
from kubernetes.client.models import V1PodStatus

logger = logging.getLogger(__name__)
Expand All @@ -26,6 +28,8 @@
class StatusData:
def __init__(self):
self.status_data = {}
self.k8s_api_client = None
self.namespace = "default"

@staticmethod
def determine_status_from_k8s(status_object: V1PodStatus) -> Tuple[str, str, str]:
Expand Down Expand Up @@ -114,6 +118,57 @@ def get_timestamp_as_str() -> str:
)
return current_utc_time

def set_k8s_api_client(self, k8s_api_client: client.CoreV1Api, namespace: str):
self.k8s_api_client = k8s_api_client
self.namespace = namespace

def fetch_status_from_k8s_api(self, release: str) -> Tuple[str, str, str]:
"""
Get the actual status of a release from k8s via the client API.
Because this can be as costly operation it is only used at critical times such as deleted pods.
Returns:
- A tuple consisting of status, pod_message, container_message
"""
logger.debug(
f"Getting the status of release {release} directly from k8s via the api client"
)

status = "Unset"
pod_message = container_message = ""

try:
api_response = self.k8s_api_client.list_namespaced_pod(
self.namespace, limit=500, timeout_seconds=120, watch=False
)

for pod in api_response.items:
if pod.metadata.labels.get("release") == release:
pod_status = StatusData.determine_status_from_k8s(pod.status)
if status == "Unset":
status = pod_status
logger.debug(
f"Preliminary status of release {release} set from Unset to {status}"
)
elif status == "Deleted":
# All other statuses override Deleted
status = pod_status
logger.debug(
f"Preliminary status of release {release} set from Deleted to {status}"
)
elif pod_status == "Running":
# Running overrides all other statuses
status = pod_status
logger.debug(
f"Preliminary status of release {release} set to {status}"
)
except ApiException as e:
logger.warning(
f"Exception when calling CoreV1Api->list_namespaced_pod. {e}"
)

return status, pod_message, container_message

def update(self, event: dict) -> None:
"""
Process a Kubernetes pod event and update the status_data.
Expand All @@ -122,7 +177,7 @@ def update(self, event: dict) -> None:
- event (dict): The Kubernetes pod event.
- status_data (dict): Dictionary containing status info.
Returns:
Sets:
- status_data (dict): Updated dictionary containing status info.
- release (str): The release of the updated status
"""
Expand Down Expand Up @@ -218,10 +273,20 @@ def update_or_create_status(
release not in status_data
or creation_timestamp >= status_data[release]["creation_timestamp"]
):

status = "Deleted" if deletion_timestamp else status

if status == "Deleted":
# Status Deleted is a destructive action
# Therefore we double-check the k8s status directly upon detecting this
status = self.fetch_status_from_k8s_api(release)
if status != "Deleted":
deletion_timestamp = None

status_data[release] = {
"creation_timestamp": creation_timestamp,
"deletion_timestamp": deletion_timestamp,
"status": "Deleted" if deletion_timestamp else status,
"status": status,
"event-ts": StatusData.get_timestamp_as_str(),
"sent": False,
}
Expand Down
1 change: 1 addition & 0 deletions serve_event_listener/status_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def process(self):
release = status_data["release"]
new_status = status_data["new-status"]
if new_status == "Deleted":
# TODO: Be careful with deleting releases! Check the truth in k8s.
logger.info(
f"Processing release: {release}. New status is Deleted!"
)
Expand Down

0 comments on commit 20c8d09

Please sign in to comment.