diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..79629ab --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,70 @@ +name: CI + +on: + push: + branches: [ "develop" ] + paths-ignore: + - '**.md' + + + # Adds ability to run this workflow manually + workflow_dispatch: + + +jobs: + + lint: + name: Run linting + runs-on: ubuntu-latest + env: + HADOLINT_RECURSIVE: "true" + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Run linter black + uses: psf/black@stable + with: + options: "--check" + + - name: Run hadolint + uses: hadolint/hadolint-action@v3.1.0 + with: + dockerfile: "Dockerfile*" + + + tests: + name: Run unit-tests + runs-on: ubuntu-latest + + strategy: + matrix: + python-version: [3.12] + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Run tests + run: python -m unittest discover -s tests + + + build: + name: Build a CI image + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Build the Docker image + run: docker build . --file Dockerfile --tag ci:$(date +%s) diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index e1f4dd0..ad89ca8 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -27,7 +27,7 @@ jobs: strategy: matrix: - python-version: [3.8, 3.9, 3.11, 3.12] + python-version: [3.12] steps: - name: Checkout code diff --git a/.gitignore b/.gitignore index 37577ba..e9d687e 100644 --- a/.gitignore +++ b/.gitignore @@ -151,6 +151,12 @@ node_modules .DS_Store .idea/ +# VSCode configurations +.vscode + # The media folder contents media/* !media/.gitkeep + +# Local only files +.local-only/ diff --git a/serve_event_listener/event_listener.py b/serve_event_listener/event_listener.py index 2b91866..5bfc264 100644 --- a/serve_event_listener/event_listener.py +++ b/serve_event_listener/event_listener.py @@ -77,10 +77,11 @@ def setup(self, **kwargs: Optional[Any]) -> None: "\n\n\t{}\n\t Running Setup Process \n\t{}\n".format("#" * 30, "#" * 30) ) try: - self.check_status() + self.check_serve_api_status() self.setup_client() self.token = self.fetch_token() self._status_data = StatusData() + self._status_data.set_k8s_api_client(self.client, self.namespace) self._status_queue = StatusQueue(self.post, self.token) self.setup_complete = True except Exception as e: @@ -99,6 +100,8 @@ def listen(self) -> None: ) max_retries = 10 + + # Duration in seconds to wait between retrying used when some exceptions occur retry_delay = 2 if self.setup_complete: @@ -147,9 +150,9 @@ def listen(self) -> None: else: logger.warning("Setup not completed - run .setup() first") - def check_status(self) -> bool: + def check_serve_api_status(self) -> bool: """ - Checks the status of the EventListener. + Checks the status of the Serve API. Returns: - bool: True if the status is okay, False otherwise. @@ -185,8 +188,30 @@ def setup_client(self) -> None: logger.info("Kubernetes client successfully set") self.client = client.CoreV1Api() + + # self.list_all_pods() + self.watch = watch.Watch() + def list_all_pods(self): + logger.info("Listing all pods and their status codes") + + try: + api_response = self.client.list_namespaced_pod( + self.namespace, limit=500, timeout_seconds=120, watch=False + ) + + for pod in api_response.items: + release = pod.metadata.labels.get("release") + app_status = StatusData.determine_status_from_k8s(pod.status) + logger.info( + f"Release={release}, {pod.metadata.name} with status {app_status}" + ) + except ApiException as e: + logger.warning( + f"Exception when calling CoreV1Api->list_namespaced_pod. {e}" + ) + def fetch_token(self): """ Retrieve an authentication token by sending a POST request with the provided data. @@ -251,7 +276,7 @@ def post( elif status_code in [401, 403]: logger.warning( - f"Recieved status code {status_code} - Fetching new token and retrying once" + f"Received status code {status_code} - Fetching new token and retrying once" ) self.token = self.fetch_token() self._status_queue.token = self.token @@ -263,17 +288,17 @@ def post( elif status_code in [404]: logger.warning( - f"Recieved status code {status_code} - {response.text}" + f"Received status code {status_code} - {response.text}" ) break elif str(status_code).startswith("5"): - logger.warning(f"Recieved status code {status_code}") + logger.warning(f"Received status code {status_code}") logger.warning(f"Retrying in {sleep} seconds") time.sleep(sleep) else: - logger.warning(f"Recieved uncaught status code: {status_code}") + logger.warning(f"Received uncaught status code: {status_code}") logger.info(f"POST returned - Status code: {status_code}") @@ -289,10 +314,10 @@ def post( def get(self, url: str, headers: Union[None, dict] = None): """ - Send a POST request to the specified URL with the provided data and token. + Send a GET request to the specified URL with the provided data and token. Args: - url (str): The URL to send the POST request to. + url (str): The URL to send the GET request to. data (dict): The data to be included in the POST request. header (None or dict): header for the request. diff --git a/serve_event_listener/status_data.py b/serve_event_listener/status_data.py index 8ce5617..436543c 100644 --- a/serve_event_listener/status_data.py +++ b/serve_event_listener/status_data.py @@ -4,6 +4,8 @@ from typing import Any, Dict, Tuple, Union import requests +from kubernetes import client +from kubernetes.client.exceptions import ApiException from kubernetes.client.models import V1PodStatus logger = logging.getLogger(__name__) @@ -26,6 +28,150 @@ class StatusData: def __init__(self): self.status_data = {} + self.k8s_api_client = None + self.namespace = "default" + + @staticmethod + def determine_status_from_k8s(status_object: V1PodStatus) -> Tuple[str, str, str]: + """ + Get the status of a Kubernetes pod. + First checks init_container_statuses, then container_statuses + Properties used to translate the pod status: + - container.state + - state.terminated.reason + - state.waiting, waiting.reason + - state.running and container_status.ready + + Parameters: + - status_object (dict): The Kubernetes status object. + + Returns: + - Tuple[str, str, str]: The status of the pod, container message, pod message + """ + empty_message = "" + pod_message = status_object.message if status_object.message else empty_message + + def process_container_statuses(container_statuses, init_containers=False): + for container_status in container_statuses: + state = container_status.state + + terminated = state.terminated + if terminated: + if init_containers and terminated.reason == "Completed": + break + else: + return ( + StatusData.get_mapped_status(terminated.reason), + terminated.message if terminated.message else empty_message, + pod_message, + ) + + waiting = state.waiting + + if waiting: + return ( + StatusData.get_mapped_status(waiting.reason), + waiting.message if waiting.message else empty_message, + pod_message, + ) + else: + running = state.running + ready = container_status.ready + if running and ready: + return "Running", empty_message, pod_message + else: + return "Pending", empty_message, pod_message + else: + return None + + init_container_statuses = status_object.init_container_statuses + container_statuses = status_object.container_statuses + + if init_container_statuses is not None: + result = process_container_statuses( + init_container_statuses, init_containers=True + ) + if result: + return result + + if container_statuses is not None: + result = process_container_statuses(container_statuses) + if result: + return result + + return status_object.phase, empty_message, pod_message + + @staticmethod + def get_mapped_status(reason: str) -> str: + return K8S_STATUS_MAP.get(reason, reason) + + @staticmethod + def get_timestamp_as_str() -> str: + """ + Get the current UTC time as a formatted string. + + Returns: + str: The current UTC time in ISO format with milliseconds. + """ + current_utc_time = ( + datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + ) + return current_utc_time + + def set_k8s_api_client(self, k8s_api_client: client.CoreV1Api, namespace: str): + self.k8s_api_client = k8s_api_client + self.namespace = namespace + + def fetch_status_from_k8s_api(self, release: str) -> Tuple[str, str, str]: + """ + Get the actual status of a release from k8s via the client API. + Because this can be as costly operation it is only used at critical times such as deleted pods. + + Returns: + - Tuple[str, str, str]: The status of the pod, container message, pod message + + If no pod matches the release, then return None, "", "" + """ + logger.debug( + f"Getting the status of release {release} directly from k8s via the api client" + ) + + status = None + container_message = pod_message = "" + + try: + api_response = self.k8s_api_client.list_namespaced_pod( + self.namespace, limit=500, timeout_seconds=120, watch=False + ) + + for pod in api_response.items: + if pod.metadata.labels.get("release") == release: + pod_status, container_message, pod_message = ( + StatusData.determine_status_from_k8s(pod.status) + ) + if status is None: + status = pod_status + logger.debug( + f"Preliminary status of release {release} set from None to {status}" + ) + elif status == "Deleted": + # All other statuses override Deleted + status = pod_status + logger.debug( + f"Preliminary status of release {release} set from Deleted to {status}" + ) + elif pod_status == "Running": + # Running overrides all other statuses + status = pod_status + logger.debug( + f"Preliminary status of release {release} set to {status}" + ) + except ApiException as e: + logger.warning( + f"Exception when calling CoreV1Api->list_namespaced_pod. {e}" + ) + + return status, container_message, pod_message def update(self, event: dict) -> None: """ @@ -35,7 +181,7 @@ def update(self, event: dict) -> None: - event (dict): The Kubernetes pod event. - status_data (dict): Dictionary containing status info. - Returns: + Sets: - status_data (dict): Updated dictionary containing status info. - release (str): The release of the updated status """ @@ -47,7 +193,9 @@ def update(self, event: dict) -> None: if pod: status_object = pod.status - status, pod_message, container_message = self.get_status(status_object) + status, container_message, pod_message = ( + StatusData.determine_status_from_k8s(status_object) + ) release = pod.metadata.labels.get("release") logger.debug(f"Event triggered from release {release}") @@ -98,69 +246,6 @@ def get_post_data(self) -> dict: logger.debug("Converting to POST data") return post_data - def get_status(self, status_object: V1PodStatus) -> Tuple[str, str, str]: - """ - Get the status of a Kubernetes pod. - - Parameters: - - status_object (dict): The Kubernetes status object. - - Returns: - - str: The status of the pod. - """ - empty_message = "" - pod_message = status_object.message if status_object.message else empty_message - - def process_container_statuses(container_statuses, init_containers=False): - for container_status in container_statuses: - state = container_status.state - - terminated = state.terminated - if terminated: - if init_containers and terminated.reason == "Completed": - break - else: - return ( - self.mapped_status(terminated.reason), - terminated.message if terminated.message else empty_message, - pod_message, - ) - - waiting = state.waiting - - if waiting: - return ( - self.mapped_status(waiting.reason), - waiting.message if waiting.message else empty_message, - pod_message, - ) - else: - running = state.running - ready = container_status.ready - if running and ready: - return "Running", empty_message, pod_message - else: - return "Pending", empty_message, pod_message - else: - return None - - init_container_statuses = status_object.init_container_statuses - container_statuses = status_object.container_statuses - - if init_container_statuses is not None: - result = process_container_statuses( - init_container_statuses, init_containers=True - ) - if result: - return result - - if container_statuses is not None: - result = process_container_statuses(container_statuses) - if result: - return result - - return status_object.phase, empty_message, pod_message - def update_or_create_status( self, status_data: Dict[str, Any], @@ -182,15 +267,39 @@ def update_or_create_status( Returns: Dict: Updated status data. """ + + logger.debug( + f"Release {release}. Status data before update:{status_data}. \ + {(release in status_data)=}? \ + creation_timestamp={creation_timestamp}, deletion_timestamp={deletion_timestamp}" + ) if ( release not in status_data or creation_timestamp >= status_data[release]["creation_timestamp"] ): + + status = "Deleted" if deletion_timestamp else status + + if status == "Deleted": + # Status Deleted is a destructive action + # Therefore we double-check the k8s status directly upon detecting this + if self.k8s_api_client: + # Only use if the k8s client api has been set + # Unit tests for example do not currently set a k8s api + status, *_ = self.fetch_status_from_k8s_api(release) + + if status is None: + # No pod with this release found. Set status to Deleted + status = "Deleted" + + if status != "Deleted": + deletion_timestamp = None + status_data[release] = { "creation_timestamp": creation_timestamp, "deletion_timestamp": deletion_timestamp, - "status": "Deleted" if deletion_timestamp else status, - "event-ts": self.get_timestamp_as_str(), + "status": status, + "event-ts": StatusData.get_timestamp_as_str(), "sent": False, } logger.debug( @@ -206,18 +315,3 @@ def get_latest_release(self): self.status_data, key=lambda k: self.status_data[k]["event-ts"] ) return latest_release - - def mapped_status(self, reason: str) -> str: - return K8S_STATUS_MAP.get(reason, reason) - - def get_timestamp_as_str(self) -> str: - """ - Get the current UTC time as a formatted string. - - Returns: - str: The current UTC time in ISO format with milliseconds. - """ - current_utc_time = ( - datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" - ) - return current_utc_time diff --git a/serve_event_listener/status_queue.py b/serve_event_listener/status_queue.py index 4f62bff..a2862db 100644 --- a/serve_event_listener/status_queue.py +++ b/serve_event_listener/status_queue.py @@ -26,7 +26,15 @@ def add(self, status_data): def process(self): while not self.stop_event.is_set(): try: - status_data = self.queue.get(timeout=2) # Wait for 1 second + status_data = self.queue.get(timeout=2) # Wait for 2 seconds + + release = status_data["release"] + new_status = status_data["new-status"] + + if new_status == "Deleted": + logger.info( + f"Processing release: {release}. New status is Deleted!" + ) self.post_handler( data=status_data, @@ -35,7 +43,9 @@ def process(self): self.queue.task_done() - logger.debug("Processing queue successfully") + logger.debug( + f"Processed queue successfully of release {release}, new status={new_status}" + ) except queue.Empty: pass # Continue looping if the queue is empty diff --git a/tests/create_pods.py b/tests/create_pods.py index 4ff5c0b..1246af6 100644 --- a/tests/create_pods.py +++ b/tests/create_pods.py @@ -88,3 +88,77 @@ def error_image_pull(self): def get_current_time(self): return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + + +class PodStatus(models.V1PodStatus): + + def __init__(self): + super().__init__() + self.init_container_statuses = [] + self.container_statuses = [] + + def add_init_container_status(self, state: str, reason: str, exit_code: int = 0): + """ + Adds an init container status object to the list of init container statuses. + state one of: waiting, terminated + """ + + if state == "waiting": + container_state = models.V1ContainerState( + waiting=models.V1ContainerStateWaiting(reason=reason) + ) + elif state == "terminated": + container_state = models.V1ContainerState( + terminated=models.V1ContainerStateTerminated( + exit_code=exit_code, reason=reason + ) + ) + else: + container_state = models.V1ContainerState() + + container_status = models.V1ContainerStatus( + name="name", + state=container_state, + image="some image", + image_id="123", + ready=False, + restart_count=0, + ) + + self.init_container_statuses.append(container_status) + + def add_container_status( + self, state: str, reason: str, ready: bool = False, exit_code: int = 0 + ): + """ + Adds an regular, non-init container status object to the list of container statuses. + state one of: waiting, running + """ + + if state == "waiting": + container_state = models.V1ContainerState( + waiting=models.V1ContainerStateWaiting(reason=reason) + ) + elif state == "running": + container_state = models.V1ContainerState( + running=models.V1ContainerStateRunning() + ) + elif state == "terminated": + container_state = models.V1ContainerState( + terminated=models.V1ContainerStateTerminated( + exit_code=exit_code, reason=reason + ) + ) + else: + container_state = models.V1ContainerState() + + container_status = models.V1ContainerStatus( + name="name", + state=container_state, + image="some image", + image_id="123", + ready=ready, + restart_count=0, + ) + + self.container_statuses.append(container_status) diff --git a/tests/test_status_data.py b/tests/test_status_data.py index 2a1a293..e9acb21 100644 --- a/tests/test_status_data.py +++ b/tests/test_status_data.py @@ -1,7 +1,8 @@ +import time import unittest from serve_event_listener.status_data import StatusData -from tests.create_pods import Pod +from tests.create_pods import Pod, PodStatus class TestPodProcessing(unittest.TestCase): @@ -74,19 +75,222 @@ def test_replica_scenario(self): self.new_pod.create(release) self.status_data.update({"object": self.new_pod}) + time.sleep(0.01) + self.pod.delete() self.status_data.update({"object": self.pod}) + time.sleep(0.01) + self.new_pod.running() self.status_data.update({"object": self.new_pod}) self.assertEqual(self.status_data.status_data[release].get("status"), "Running") + time.sleep(0.01) + self.new_pod.delete() self.status_data.update({"object": self.new_pod}) self.assertEqual(self.status_data.status_data[release].get("status"), "Deleted") + def test_valid_and_invalid_image_edits(self): + """ + This scenario creates a pod, then creates a pod with an invalid image, and finally + it created a pod with a valid image. + After the third pod is created, the first two are deleted. + This occurs when a user chnages the image to an invalid image and then valid image. + """ + + release = "r-valid-invalid-images" + + # Pod: pod + self.pod.create(release) + self.status_data.update({"object": self.pod}) + + assert self.status_data.status_data[release].get("status") == "Created" + + time.sleep(0.01) + + self.pod.running() + self.status_data.update({"object": self.pod}) + assert self.status_data.status_data[release].get("status") == "Running" + + # Pod: invalid_pod + self.invalid_pod = Pod() + self.invalid_pod.create(release) + + time.sleep(0.01) + + self.invalid_pod.error_image_pull() + self.status_data.update({"object": self.invalid_pod}) + assert self.status_data.status_data[release].get("status") == "Image Error" + + # Now there are two pods in the release, one older Running and one newer Image Error + + # Pod: valid_pod + self.valid_pod = Pod() + self.valid_pod.create(release) + + time.sleep(0.01) + + self.valid_pod.running() + self.status_data.update({"object": self.valid_pod}) + assert self.status_data.status_data[release].get("status") == "Running" + + # The first two pods are deleted but the last pod should remain running + self.pod.delete() + self.invalid_pod.delete() + + self.status_data.update({"object": self.pod}) + + msg = f"Release created ts={self.status_data.status_data[release].get("creation_timestamp")}, \ + deleted ts={self.status_data.status_data[release].get("deletion_timestamp")}" + + print(msg) + + self.assertEqual( + self.status_data.status_data[release].get("status"), + "Running", + f"Release should be Running after delete of first pod, \ + ts pod deleted={self.pod.metadata.deletion_timestamp} vs \ + ts invalid_pod deleted={self.invalid_pod.metadata.deletion_timestamp} vs \ + ts valid_pod created={self.valid_pod.metadata.creation_timestamp}, {msg}", + ) + + self.status_data.update({"object": self.invalid_pod}) + self.assertEqual( + self.status_data.status_data[release].get("status"), + "Running", + f"Release should be Running after delete of 2nd invalid pod, \ + ts pod deleted={self.pod.metadata.deletion_timestamp} vs \ + ts invalid_pod deleted={self.invalid_pod.metadata.deletion_timestamp} vs \ + ts valid_pod created={self.valid_pod.metadata.creation_timestamp}, {msg}", + ) + + +class TestStatusConverter(unittest.TestCase): + """Verifies the translation logic of k8s status objects to app status codes. + + This executes static method determine_status_from_k8s with signature: + determine_status_from_k8s(status_object: V1PodStatus) -> Tuple[str, str, str] + The response object has structure: status_object: phase, message, pod_message + """ + + def test_waiting_container_reason_pending(self): + """ + This scenario tests a k8s pod status object with a container with the following status attributes: + state=waiting, reason=PodInitializing + """ + podstatus = PodStatus() + podstatus.add_container_status("waiting", "PodInitializing") + expected = ("Pending", "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + def test_running_container_status_not_ready(self): + """ + This scenario tests a k8s pod status object with a container with the following status attributes: + state=running, ready=false + """ + podstatus = PodStatus() + podstatus.add_container_status("running", None, ready=False) + expected = ("Pending", "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + def test_running_container_status_ready(self): + """ + This scenario tests a k8s pod status object with a container with the following status attributes: + state=running, ready=true + """ + podstatus = PodStatus() + podstatus.add_container_status("running", None, ready=True) + expected = ("Running", "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + def test_deleted_container(self): + """ + This scenario tests a k8s pod status object with a container with the following status attributes: + state=terminated, terminated reason="Terminated", message="Deleted", exit_code=1 + """ + podstatus = PodStatus() + podstatus.add_container_status( + "terminated", "Terminated", ready=False, exit_code=1 + ) + expected = ("Terminated", "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + def test_terminated_init_container_reason_error(self): + """ + This scenario tests a k8s pod status object with an init container with the following status attributes: + Input: state=terminated, terminated.reason=PostStartHookError + The Terminated exit code is set to 137 which could indicate for example that the container ran out of memory. + See https://containersolutions.github.io/runbooks/posts/kubernetes/crashloopbackoff/#step-3 + """ + podstatus = PodStatus() + podstatus.add_init_container_status( + "terminated", "PostStartHookError", exit_code=137 + ) + expected = ("Pod Error", "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + def test_waiting_init_container_reason_error(self): + """ + This scenario tests a k8s pod status object with an init container with the following status attributes: + Input: state=waiting, terminated.reason=PostStartHookError + """ + podstatus = PodStatus() + podstatus.add_init_container_status("waiting", "PostStartHookError") + expected = ("Pod Error", "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + def test_multiple_container_status_terminated_init_container_running_container( + self, + ): + """ + This scenario tests a k8s pod status object with two containers. + Init container input: state=terminated, terminated.reason=Completed + Regular container input: stater=running, ready=false + """ + podstatus = PodStatus() + podstatus.add_init_container_status("terminated", "Completed") + podstatus.add_container_status("running", "", ready=True) + expected = ("Running", "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + def test_missing_container_status(self): + """This scenario tests an empty k8s pod status object.""" + podstatus = PodStatus() + expected = (None, "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + +class TestStatusDataUtilities(unittest.TestCase): + """Verifies the app status utility methods.""" + + def test_mapped_status(self): + """Test the mapped status codes. Not all codes need to be tested.""" + actual = StatusData.get_mapped_status("CrashLoopBackOff") + self.assertEqual(actual, "Error") + + actual = StatusData.get_mapped_status("Completed") + self.assertEqual(actual, "Retrying...") + + actual = StatusData.get_mapped_status("ErrImagePull") + self.assertEqual(actual, "Image Error") + + def test_mapped_status_nonexisting_code(self): + """Test the mapped status codes in a scenario with a non-existing code.""" + actual = StatusData.get_mapped_status("NonexistingCode") + self.assertEqual(actual, "NonexistingCode") + if __name__ == "__main__": unittest.main()