From c5a9cc777e154f162cd644e757a83350c3e41272 Mon Sep 17 00:00:00 2001 From: alfredeen Date: Thu, 15 Aug 2024 12:04:25 +0200 Subject: [PATCH 01/14] Adding CI action with linting and tests --- .github/workflows/ci.yaml | 66 +++++++++++++++++++++++++++++++++++++++ .gitignore | 3 ++ 2 files changed, 69 insertions(+) create mode 100644 .github/workflows/ci.yaml diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..05214af --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,66 @@ +name: CI + +on: + push: + branches: [ "develop" ] + paths-ignore: + - '**.md' + + + # Adds ability to run this workflow manually + workflow_dispatch: + + +jobs: + + lint: + name: Run linting + runs-on: ubuntu-latest + env: + HADOLINT_RECURSIVE: "true" + + steps: + - uses: psf/black@stable + with: + options: "-l 120 --check" + + - uses: actions/checkout@v3 + - uses: hadolint/hadolint-action@v3.1.0 + with: + dockerfile: "Dockerfile*" + + + tests: + name: Run unit-tests + runs-on: ubuntu-latest + + strategy: + matrix: + python-version: [3.9, 3.11, 3.12] + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Run tests + run: python -m unittest discover -s tests + + + build: + name: Build a ci image + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Build the Docker image + run: docker build . --file Dockerfile --tag ci:$(date +%s) diff --git a/.gitignore b/.gitignore index 37577ba..f9f24dc 100644 --- a/.gitignore +++ b/.gitignore @@ -154,3 +154,6 @@ node_modules # The media folder contents media/* !media/.gitkeep + +# Local only files +.local-only/ From c907ce5002c86f10c803a67f6ae695be4b2e742c Mon Sep 17 00:00:00 2001 From: alfredeen Date: Thu, 15 Aug 2024 12:11:49 +0200 Subject: [PATCH 02/14] Minor CI action improvements --- .github/workflows/ci.yaml | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 05214af..5ed4dae 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -20,12 +20,16 @@ jobs: HADOLINT_RECURSIVE: "true" steps: - - uses: psf/black@stable + - name: Checkout code + uses: actions/checkout@v3 + + - name: Run linter black + uses: psf/black@stable with: options: "-l 120 --check" - - uses: actions/checkout@v3 - - uses: hadolint/hadolint-action@v3.1.0 + - name: Run hadolint + uses: hadolint/hadolint-action@v3.1.0 with: dockerfile: "Dockerfile*" @@ -57,7 +61,7 @@ jobs: build: - name: Build a ci image + name: Build a CI image runs-on: ubuntu-latest steps: From d8c72bff3f9543c3a30830a76bb525c63138f8b5 Mon Sep 17 00:00:00 2001 From: alfredeen Date: Thu, 15 Aug 2024 12:15:57 +0200 Subject: [PATCH 03/14] Aligned CI black linter with this repos set line length --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 5ed4dae..1be4f79 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -26,7 +26,7 @@ jobs: - name: Run linter black uses: psf/black@stable with: - options: "-l 120 --check" + options: "--check" - name: Run hadolint uses: hadolint/hadolint-action@v3.1.0 From c196b01b3c8ac5fd108d48a970346c53286e17ee Mon Sep 17 00:00:00 2001 From: alfredeen Date: Fri, 16 Aug 2024 16:48:49 +0200 Subject: [PATCH 04/14] Minor corrections to spelling --- .gitignore | 3 +++ serve_event_listener/event_listener.py | 10 ++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index f9f24dc..e9d687e 100644 --- a/.gitignore +++ b/.gitignore @@ -151,6 +151,9 @@ node_modules .DS_Store .idea/ +# VSCode configurations +.vscode + # The media folder contents media/* !media/.gitkeep diff --git a/serve_event_listener/event_listener.py b/serve_event_listener/event_listener.py index 2b91866..d97a17d 100644 --- a/serve_event_listener/event_listener.py +++ b/serve_event_listener/event_listener.py @@ -99,6 +99,8 @@ def listen(self) -> None: ) max_retries = 10 + + # Duration in seconds to wait between retrying used when some exceptions occur retry_delay = 2 if self.setup_complete: @@ -251,7 +253,7 @@ def post( elif status_code in [401, 403]: logger.warning( - f"Recieved status code {status_code} - Fetching new token and retrying once" + f"Received status code {status_code} - Fetching new token and retrying once" ) self.token = self.fetch_token() self._status_queue.token = self.token @@ -263,17 +265,17 @@ def post( elif status_code in [404]: logger.warning( - f"Recieved status code {status_code} - {response.text}" + f"Received status code {status_code} - {response.text}" ) break elif str(status_code).startswith("5"): - logger.warning(f"Recieved status code {status_code}") + logger.warning(f"Received status code {status_code}") logger.warning(f"Retrying in {sleep} seconds") time.sleep(sleep) else: - logger.warning(f"Recieved uncaught status code: {status_code}") + logger.warning(f"Received uncaught status code: {status_code}") logger.info(f"POST returned - Status code: {status_code}") From bedddba700022acdd6189af7cbd2dc72d6e06c3e Mon Sep 17 00:00:00 2001 From: alfredeen Date: Tue, 20 Aug 2024 15:05:22 +0200 Subject: [PATCH 05/14] More logging and corrected some comments --- serve_event_listener/event_listener.py | 6 +++--- serve_event_listener/status_data.py | 3 +++ serve_event_listener/status_queue.py | 9 +++++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/serve_event_listener/event_listener.py b/serve_event_listener/event_listener.py index d97a17d..8afd241 100644 --- a/serve_event_listener/event_listener.py +++ b/serve_event_listener/event_listener.py @@ -151,7 +151,7 @@ def listen(self) -> None: def check_status(self) -> bool: """ - Checks the status of the EventListener. + Checks the status of the Serve API. Returns: - bool: True if the status is okay, False otherwise. @@ -291,10 +291,10 @@ def post( def get(self, url: str, headers: Union[None, dict] = None): """ - Send a POST request to the specified URL with the provided data and token. + Send a GET request to the specified URL with the provided data and token. Args: - url (str): The URL to send the POST request to. + url (str): The URL to send the GET request to. data (dict): The data to be included in the POST request. header (None or dict): header for the request. diff --git a/serve_event_listener/status_data.py b/serve_event_listener/status_data.py index 8ce5617..97de6a3 100644 --- a/serve_event_listener/status_data.py +++ b/serve_event_listener/status_data.py @@ -182,6 +182,9 @@ def update_or_create_status( Returns: Dict: Updated status data. """ + logger.debug(f"Release {release}. Status data before update:{status_data}. \ + release in status data? {release not in status_data}. \ + creation_timestamp={creation_timestamp}, deletion_timestamp={deletion_timestamp}") if ( release not in status_data or creation_timestamp >= status_data[release]["creation_timestamp"] diff --git a/serve_event_listener/status_queue.py b/serve_event_listener/status_queue.py index 4f62bff..ff958d2 100644 --- a/serve_event_listener/status_queue.py +++ b/serve_event_listener/status_queue.py @@ -26,7 +26,12 @@ def add(self, status_data): def process(self): while not self.stop_event.is_set(): try: - status_data = self.queue.get(timeout=2) # Wait for 1 second + status_data = self.queue.get(timeout=2) # Wait for 2 seconds + + release = status_data["release"] + new_status = status_data["new-status"] + if new_status == "Deleted": + logger.info(f"Processing release: {release}. New status is Deleted!") self.post_handler( data=status_data, @@ -35,7 +40,7 @@ def process(self): self.queue.task_done() - logger.debug("Processing queue successfully") + logger.debug(f"Processed queue successfully of release {release}, new status={new_status}") except queue.Empty: pass # Continue looping if the queue is empty From a9308c9aeaebbef03e21befb4407af09d48df97c Mon Sep 17 00:00:00 2001 From: alfredeen Date: Tue, 20 Aug 2024 16:05:00 +0200 Subject: [PATCH 06/14] Added a unit test for scenario of editing a release with valid image to invalid image back to valid image. --- serve_event_listener/status_data.py | 6 ++- serve_event_listener/status_queue.py | 8 +++- tests/test_status_data.py | 66 ++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 4 deletions(-) diff --git a/serve_event_listener/status_data.py b/serve_event_listener/status_data.py index 97de6a3..0d11bfb 100644 --- a/serve_event_listener/status_data.py +++ b/serve_event_listener/status_data.py @@ -182,9 +182,11 @@ def update_or_create_status( Returns: Dict: Updated status data. """ - logger.debug(f"Release {release}. Status data before update:{status_data}. \ + logger.debug( + f"Release {release}. Status data before update:{status_data}. \ release in status data? {release not in status_data}. \ - creation_timestamp={creation_timestamp}, deletion_timestamp={deletion_timestamp}") + creation_timestamp={creation_timestamp}, deletion_timestamp={deletion_timestamp}" + ) if ( release not in status_data or creation_timestamp >= status_data[release]["creation_timestamp"] diff --git a/serve_event_listener/status_queue.py b/serve_event_listener/status_queue.py index ff958d2..ee77a05 100644 --- a/serve_event_listener/status_queue.py +++ b/serve_event_listener/status_queue.py @@ -31,7 +31,9 @@ def process(self): release = status_data["release"] new_status = status_data["new-status"] if new_status == "Deleted": - logger.info(f"Processing release: {release}. New status is Deleted!") + logger.info( + f"Processing release: {release}. New status is Deleted!" + ) self.post_handler( data=status_data, @@ -40,7 +42,9 @@ def process(self): self.queue.task_done() - logger.debug(f"Processed queue successfully of release {release}, new status={new_status}") + logger.debug( + f"Processed queue successfully of release {release}, new status={new_status}" + ) except queue.Empty: pass # Continue looping if the queue is empty diff --git a/tests/test_status_data.py b/tests/test_status_data.py index 2a1a293..a2c9e08 100644 --- a/tests/test_status_data.py +++ b/tests/test_status_data.py @@ -87,6 +87,72 @@ def test_replica_scenario(self): self.assertEqual(self.status_data.status_data[release].get("status"), "Deleted") + def test_valid_and_invalid_image_edits(self): + """ + This scenario creates a pod, then creates a pod with an invalid image, and finally + it created a pod with a valid image. + After the third pod is created, the first two are deleted. + This occurs when a user chnages the image to an invalid image and then valid image. + """ + + release = "r-valid-invalid-images" + + # Pod: pod + self.pod.create(release) + self.status_data.update({"object": self.pod}) + + assert self.status_data.status_data[release].get("status") == "Created" + + self.pod.running() + self.status_data.update({"object": self.pod}) + assert self.status_data.status_data[release].get("status") == "Running" + + # Pod: invalid_pod + self.invalid_pod = Pod() + self.invalid_pod.create(release) + self.invalid_pod.error_image_pull() + self.status_data.update({"object": self.invalid_pod}) + assert self.status_data.status_data[release].get("status") == "Image Error" + + # Now there are two pods in the release, one older Running and one newer Image Error + + # Pod: valid_pod + self.valid_pod = Pod() + self.valid_pod.create(release) + self.valid_pod.running() + self.status_data.update({"object": self.valid_pod}) + assert self.status_data.status_data[release].get("status") == "Running" + + # The first two pods are deleted but the last pod should remain running + self.pod.delete() + self.invalid_pod.delete() + + self.status_data.update({"object": self.pod}) + + msg = f"Release created ts={self.status_data.status_data[release].get("creation_timestamp")}, \ + deleted ts={self.status_data.status_data[release].get("deletion_timestamp")}" + + print(msg) + + self.assertEqual( + self.status_data.status_data[release].get("status"), + "Running", + f"Release should be Running after delete of first pod, \ + ts pod deleted={self.pod.metadata.deletion_timestamp} vs \ + ts invalid_pod deleted={self.invalid_pod.metadata.deletion_timestamp} vs \ + ts valid_pod created={self.valid_pod.metadata.creation_timestamp}, {msg}", + ) + + self.status_data.update({"object": self.invalid_pod}) + self.assertEqual( + self.status_data.status_data[release].get("status"), + "Running", + f"Release should be Running after delete of 2nd invalid pod, \ + ts pod deleted={self.pod.metadata.deletion_timestamp} vs \ + ts invalid_pod deleted={self.invalid_pod.metadata.deletion_timestamp} vs \ + ts valid_pod created={self.valid_pod.metadata.creation_timestamp}, {msg}", + ) + if __name__ == "__main__": unittest.main() From efb0e3bd5ffc14edc66821617ec456b3774e5570 Mon Sep 17 00:00:00 2001 From: alfredeen Date: Thu, 22 Aug 2024 13:28:53 +0200 Subject: [PATCH 07/14] Refactoring status data to prepare for better unit test coverage. --- serve_event_listener/event_listener.py | 20 ++++++ serve_event_listener/status_data.py | 96 +++++++++++++++++++++----- tests/test_status_data.py | 24 +++++++ 3 files changed, 122 insertions(+), 18 deletions(-) diff --git a/serve_event_listener/event_listener.py b/serve_event_listener/event_listener.py index 8afd241..60c7c00 100644 --- a/serve_event_listener/event_listener.py +++ b/serve_event_listener/event_listener.py @@ -187,8 +187,28 @@ def setup_client(self) -> None: logger.info("Kubernetes client successfully set") self.client = client.CoreV1Api() + + # self.list_all_pods() + self.watch = watch.Watch() + def list_all_pods(self): + logger.info("Listing all pods and status codes") + + try: + api_response = self.client.list_namespaced_pod( + self.namespace, limit=500, timeout_seconds=120, watch=False + ) + + for pod in api_response.items: + # TODO get_status() similar to logic in StatusData + # Make this a helper function and unit test? + print(f"{pod.metadata.name} with status {pod.status}") + except ApiException as e: + logger.warning( + f"Exception when calling CoreV1Api->list_namespaced_pod. {e}" + ) + def fetch_token(self): """ Retrieve an authentication token by sending a POST request with the provided data. diff --git a/serve_event_listener/status_data.py b/serve_event_listener/status_data.py index 0d11bfb..464cde1 100644 --- a/serve_event_listener/status_data.py +++ b/serve_event_listener/status_data.py @@ -27,6 +27,80 @@ class StatusData: def __init__(self): self.status_data = {} + """ + @staticmethod + def determine_status_from_k8s(status_object: V1PodStatus) -> Tuple[str, str, str]: + # TODO: a copy of function from StatusData + empty_message = "" + pod_message = status_object.message if status_object.message else empty_message + + def process_container_statuses(container_statuses, init_containers=False): + for container_status in container_statuses: + state = container_status.state + + terminated = state.terminated + if terminated: + if init_containers and terminated.reason == "Completed": + break + else: + return ( + self.mapped_status(terminated.reason), + terminated.message if terminated.message else empty_message, + pod_message, + ) + + waiting = state.waiting + + if waiting: + return ( + self.mapped_status(waiting.reason), + waiting.message if waiting.message else empty_message, + pod_message, + ) + else: + running = state.running + ready = container_status.ready + if running and ready: + return "Running", empty_message, pod_message + else: + return "Pending", empty_message, pod_message + else: + return None + + init_container_statuses = status_object.init_container_statuses + container_statuses = status_object.container_statuses + + if init_container_statuses is not None: + result = process_container_statuses( + init_container_statuses, init_containers=True + ) + if result: + return result + + if container_statuses is not None: + result = process_container_statuses(container_statuses) + if result: + return result + + return status_object.phase, empty_message, pod_message """ + + @staticmethod + def get_mapped_status(reason: str) -> str: + return K8S_STATUS_MAP.get(reason, reason) + + @staticmethod + def get_timestamp_as_str() -> str: + """ + Get the current UTC time as a formatted string. + + Returns: + str: The current UTC time in ISO format with milliseconds. + """ + current_utc_time = ( + datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + ) + return current_utc_time + def update(self, event: dict) -> None: """ Process a Kubernetes pod event and update the status_data. @@ -121,7 +195,7 @@ def process_container_statuses(container_statuses, init_containers=False): break else: return ( - self.mapped_status(terminated.reason), + StatusData.get_mapped_status(terminated.reason), terminated.message if terminated.message else empty_message, pod_message, ) @@ -130,7 +204,7 @@ def process_container_statuses(container_statuses, init_containers=False): if waiting: return ( - self.mapped_status(waiting.reason), + StatusData.get_mapped_status(waiting.reason), waiting.message if waiting.message else empty_message, pod_message, ) @@ -182,6 +256,7 @@ def update_or_create_status( Returns: Dict: Updated status data. """ + logger.debug( f"Release {release}. Status data before update:{status_data}. \ release in status data? {release not in status_data}. \ @@ -195,7 +270,7 @@ def update_or_create_status( "creation_timestamp": creation_timestamp, "deletion_timestamp": deletion_timestamp, "status": "Deleted" if deletion_timestamp else status, - "event-ts": self.get_timestamp_as_str(), + "event-ts": StatusData.get_timestamp_as_str(), "sent": False, } logger.debug( @@ -211,18 +286,3 @@ def get_latest_release(self): self.status_data, key=lambda k: self.status_data[k]["event-ts"] ) return latest_release - - def mapped_status(self, reason: str) -> str: - return K8S_STATUS_MAP.get(reason, reason) - - def get_timestamp_as_str(self) -> str: - """ - Get the current UTC time as a formatted string. - - Returns: - str: The current UTC time in ISO format with milliseconds. - """ - current_utc_time = ( - datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" - ) - return current_utc_time diff --git a/tests/test_status_data.py b/tests/test_status_data.py index a2c9e08..8f87606 100644 --- a/tests/test_status_data.py +++ b/tests/test_status_data.py @@ -154,5 +154,29 @@ def test_valid_and_invalid_image_edits(self): ) +class TestStatusDataUtilities(unittest.TestCase): + """Verifies the app status utility methods.""" + + def test_mapped_status(self): + """Test the mapped status codes. + Not all codes need to be tested. + """ + + actual = StatusData.get_mapped_status("CrashLoopBackOff") + self.assertEqual(actual, "Error") + + actual = StatusData.get_mapped_status("Completed") + self.assertEqual(actual, "Retrying...") + + actual = StatusData.get_mapped_status("ErrImagePull") + self.assertEqual(actual, "Image Error") + + def test_mapped_status_nonexisting_code(self): + """Test the mapped status codes in a scenario with a non-existing code.""" + + actual = StatusData.get_mapped_status("NonexistingCode") + self.assertEqual(actual, "NonexistingCode") + + if __name__ == "__main__": unittest.main() From 370cfd4fa47944c7ce343be0af618542d8534c03 Mon Sep 17 00:00:00 2001 From: alfredeen Date: Thu, 22 Aug 2024 15:31:33 +0200 Subject: [PATCH 08/14] Began unit test of method to convert k8s container status to app status. --- serve_event_listener/status_data.py | 90 +++++++---------------------- tests/create_pods.py | 32 ++++++++++ tests/test_status_data.py | 44 ++++++++++++-- 3 files changed, 91 insertions(+), 75 deletions(-) diff --git a/serve_event_listener/status_data.py b/serve_event_listener/status_data.py index 464cde1..10b350d 100644 --- a/serve_event_listener/status_data.py +++ b/serve_event_listener/status_data.py @@ -27,10 +27,23 @@ class StatusData: def __init__(self): self.status_data = {} - """ @staticmethod def determine_status_from_k8s(status_object: V1PodStatus) -> Tuple[str, str, str]: - # TODO: a copy of function from StatusData + """ + Get the status of a Kubernetes pod. + First checks init_container_statuses, then container_statuses + Properties used to translate the pod status: + - container.state + - state.terminated.reason + - state.waiting, waiting.reason + - state.running and container_status.ready + + Parameters: + - status_object (dict): The Kubernetes status object. + + Returns: + - str: The status of the pod. + """ empty_message = "" pod_message = status_object.message if status_object.message else empty_message @@ -44,7 +57,7 @@ def process_container_statuses(container_statuses, init_containers=False): break else: return ( - self.mapped_status(terminated.reason), + StatusData.get_mapped_status(terminated.reason), terminated.message if terminated.message else empty_message, pod_message, ) @@ -53,7 +66,7 @@ def process_container_statuses(container_statuses, init_containers=False): if waiting: return ( - self.mapped_status(waiting.reason), + StatusData.get_mapped_status(waiting.reason), waiting.message if waiting.message else empty_message, pod_message, ) @@ -82,7 +95,7 @@ def process_container_statuses(container_statuses, init_containers=False): if result: return result - return status_object.phase, empty_message, pod_message """ + return status_object.phase, empty_message, pod_message @staticmethod def get_mapped_status(reason: str) -> str: @@ -121,7 +134,9 @@ def update(self, event: dict) -> None: if pod: status_object = pod.status - status, pod_message, container_message = self.get_status(status_object) + status, pod_message, container_message = ( + StatusData.determine_status_from_k8s(status_object) + ) release = pod.metadata.labels.get("release") logger.debug(f"Event triggered from release {release}") @@ -172,69 +187,6 @@ def get_post_data(self) -> dict: logger.debug("Converting to POST data") return post_data - def get_status(self, status_object: V1PodStatus) -> Tuple[str, str, str]: - """ - Get the status of a Kubernetes pod. - - Parameters: - - status_object (dict): The Kubernetes status object. - - Returns: - - str: The status of the pod. - """ - empty_message = "" - pod_message = status_object.message if status_object.message else empty_message - - def process_container_statuses(container_statuses, init_containers=False): - for container_status in container_statuses: - state = container_status.state - - terminated = state.terminated - if terminated: - if init_containers and terminated.reason == "Completed": - break - else: - return ( - StatusData.get_mapped_status(terminated.reason), - terminated.message if terminated.message else empty_message, - pod_message, - ) - - waiting = state.waiting - - if waiting: - return ( - StatusData.get_mapped_status(waiting.reason), - waiting.message if waiting.message else empty_message, - pod_message, - ) - else: - running = state.running - ready = container_status.ready - if running and ready: - return "Running", empty_message, pod_message - else: - return "Pending", empty_message, pod_message - else: - return None - - init_container_statuses = status_object.init_container_statuses - container_statuses = status_object.container_statuses - - if init_container_statuses is not None: - result = process_container_statuses( - init_container_statuses, init_containers=True - ) - if result: - return result - - if container_statuses is not None: - result = process_container_statuses(container_statuses) - if result: - return result - - return status_object.phase, empty_message, pod_message - def update_or_create_status( self, status_data: Dict[str, Any], diff --git a/tests/create_pods.py b/tests/create_pods.py index 4ff5c0b..be441fd 100644 --- a/tests/create_pods.py +++ b/tests/create_pods.py @@ -88,3 +88,35 @@ def error_image_pull(self): def get_current_time(self): return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + + +class PodStatus(models.V1PodStatus): + + def __init__(self): + super().__init__() + self.init_container_statuses = [] + self.container_statuses = [] + + def add_init_container_status(self, state: str, reason: str): + """ + state one of: waiting, + reason="ContainerCreating" + """ + + if state == "waiting": + container_state = models.V1ContainerState( + waiting=models.V1ContainerStateWaiting(reason=reason) + ) + else: + container_state = models.V1ContainerState() + + container_status = models.V1ContainerStatus( + name="name", + state=container_state, + image="some image", + image_id="123", + ready=False, + restart_count=0, + ) + + self.init_container_statuses.append(container_status) diff --git a/tests/test_status_data.py b/tests/test_status_data.py index 8f87606..d42d3be 100644 --- a/tests/test_status_data.py +++ b/tests/test_status_data.py @@ -1,7 +1,7 @@ import unittest from serve_event_listener.status_data import StatusData -from tests.create_pods import Pod +from tests.create_pods import Pod, PodStatus class TestPodProcessing(unittest.TestCase): @@ -154,14 +154,47 @@ def test_valid_and_invalid_image_edits(self): ) +class TestStatusConverter(unittest.TestCase): + """Verifies the translation logic of k8s status objects to app status codes.""" + + # determine_status_from_k8s(status_object: V1PodStatus) -> Tuple[str, str, str] + # status_object: phase, message, pod_message + + def test_empty_status(self): + """This scenario tests an empty k8s pod status object.""" + podstatus = PodStatus() + expected = (None, "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + def test_terminated_init_container_reason_error(self): + """ + This scenario tests a k8s pod status object with a terminated init container. + Input: terminated=true, init containers = true, terminated.reason = PostStartHookError + """ + podstatus = PodStatus() + podstatus.add_init_container_status("waiting", "PostStartHookError") + expected = ("Pod Error", "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + def test_terminated_init_container_reason_completed_etc(self): + """ + This scenario tests a k8s pod status object with a terminated init container. + Input: terminated=true, init containers = true, terminated.reason = Completed + """ + podstatus = PodStatus() + podstatus.add_init_container_status("waiting", "Completed") + expected = ("Pod Error", "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + class TestStatusDataUtilities(unittest.TestCase): """Verifies the app status utility methods.""" def test_mapped_status(self): - """Test the mapped status codes. - Not all codes need to be tested. - """ - + """Test the mapped status codes. Not all codes need to be tested.""" actual = StatusData.get_mapped_status("CrashLoopBackOff") self.assertEqual(actual, "Error") @@ -173,7 +206,6 @@ def test_mapped_status(self): def test_mapped_status_nonexisting_code(self): """Test the mapped status codes in a scenario with a non-existing code.""" - actual = StatusData.get_mapped_status("NonexistingCode") self.assertEqual(actual, "NonexistingCode") From def58da7c2a99df63c281b92159e798ba30fdd76 Mon Sep 17 00:00:00 2001 From: alfredeen Date: Thu, 22 Aug 2024 16:47:20 +0200 Subject: [PATCH 09/14] Added several unit test scenarios to verify the translation of k8s pod status objects to app status codes. --- tests/create_pods.py | 48 ++++++++++++++++++-- tests/test_status_data.py | 93 +++++++++++++++++++++++++++++++++------ 2 files changed, 125 insertions(+), 16 deletions(-) diff --git a/tests/create_pods.py b/tests/create_pods.py index be441fd..1246af6 100644 --- a/tests/create_pods.py +++ b/tests/create_pods.py @@ -97,16 +97,22 @@ def __init__(self): self.init_container_statuses = [] self.container_statuses = [] - def add_init_container_status(self, state: str, reason: str): + def add_init_container_status(self, state: str, reason: str, exit_code: int = 0): """ - state one of: waiting, - reason="ContainerCreating" + Adds an init container status object to the list of init container statuses. + state one of: waiting, terminated """ if state == "waiting": container_state = models.V1ContainerState( waiting=models.V1ContainerStateWaiting(reason=reason) ) + elif state == "terminated": + container_state = models.V1ContainerState( + terminated=models.V1ContainerStateTerminated( + exit_code=exit_code, reason=reason + ) + ) else: container_state = models.V1ContainerState() @@ -120,3 +126,39 @@ def add_init_container_status(self, state: str, reason: str): ) self.init_container_statuses.append(container_status) + + def add_container_status( + self, state: str, reason: str, ready: bool = False, exit_code: int = 0 + ): + """ + Adds an regular, non-init container status object to the list of container statuses. + state one of: waiting, running + """ + + if state == "waiting": + container_state = models.V1ContainerState( + waiting=models.V1ContainerStateWaiting(reason=reason) + ) + elif state == "running": + container_state = models.V1ContainerState( + running=models.V1ContainerStateRunning() + ) + elif state == "terminated": + container_state = models.V1ContainerState( + terminated=models.V1ContainerStateTerminated( + exit_code=exit_code, reason=reason + ) + ) + else: + container_state = models.V1ContainerState() + + container_status = models.V1ContainerStatus( + name="name", + state=container_state, + image="some image", + image_id="123", + ready=ready, + restart_count=0, + ) + + self.container_statuses.append(container_status) diff --git a/tests/test_status_data.py b/tests/test_status_data.py index d42d3be..b9ad37a 100644 --- a/tests/test_status_data.py +++ b/tests/test_status_data.py @@ -155,40 +155,107 @@ def test_valid_and_invalid_image_edits(self): class TestStatusConverter(unittest.TestCase): - """Verifies the translation logic of k8s status objects to app status codes.""" + """Verifies the translation logic of k8s status objects to app status codes. - # determine_status_from_k8s(status_object: V1PodStatus) -> Tuple[str, str, str] - # status_object: phase, message, pod_message + This executes static method determine_status_from_k8s with signature: + determine_status_from_k8s(status_object: V1PodStatus) -> Tuple[str, str, str] + The response object has structure: status_object: phase, message, pod_message + """ - def test_empty_status(self): - """This scenario tests an empty k8s pod status object.""" + def test_waiting_container_reason_pending(self): + """ + This scenario tests a k8s pod status object with a container with the following status attributes: + state=waiting, reason=PodInitializing + """ podstatus = PodStatus() - expected = (None, "", "") + podstatus.add_container_status("waiting", "PodInitializing") + expected = ("Pending", "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + def test_running_container_status_not_ready(self): + """ + This scenario tests a k8s pod status object with a container with the following status attributes: + state=running, ready=false + """ + podstatus = PodStatus() + podstatus.add_container_status("running", None, ready=False) + expected = ("Pending", "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + def test_running_container_status_ready(self): + """ + This scenario tests a k8s pod status object with a container with the following status attributes: + state=running, ready=true + """ + podstatus = PodStatus() + podstatus.add_container_status("running", None, ready=True) + expected = ("Running", "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + def test_deleted_container(self): + """ + This scenario tests a k8s pod status object with a container with the following status attributes: + state=terminated, terminated reason="Terminated", message="Deleted", exit_code=1 + """ + podstatus = PodStatus() + podstatus.add_container_status( + "terminated", "Terminated", ready=False, exit_code=1 + ) + expected = ("Terminated", "", "") actual = StatusData.determine_status_from_k8s(podstatus) self.assertEqual(actual, expected) def test_terminated_init_container_reason_error(self): """ - This scenario tests a k8s pod status object with a terminated init container. - Input: terminated=true, init containers = true, terminated.reason = PostStartHookError + This scenario tests a k8s pod status object with an init container with the following status attributes: + Input: state=terminated, terminated.reason=PostStartHookError + The Terminated exit code is set to 137 which could indicate for example that the container ran out of memory. + See https://containersolutions.github.io/runbooks/posts/kubernetes/crashloopbackoff/#step-3 """ podstatus = PodStatus() - podstatus.add_init_container_status("waiting", "PostStartHookError") + podstatus.add_init_container_status( + "terminated", "PostStartHookError", exit_code=137 + ) expected = ("Pod Error", "", "") actual = StatusData.determine_status_from_k8s(podstatus) self.assertEqual(actual, expected) - def test_terminated_init_container_reason_completed_etc(self): + def test_waiting_init_container_reason_error(self): """ - This scenario tests a k8s pod status object with a terminated init container. - Input: terminated=true, init containers = true, terminated.reason = Completed + This scenario tests a k8s pod status object with an init container with the following status attributes: + Input: state=waiting, terminated.reason=PostStartHookError """ podstatus = PodStatus() - podstatus.add_init_container_status("waiting", "Completed") + podstatus.add_init_container_status("waiting", "PostStartHookError") expected = ("Pod Error", "", "") actual = StatusData.determine_status_from_k8s(podstatus) self.assertEqual(actual, expected) + def test_multiple_container_status_terminated_init_container_running_container( + self, + ): + """ + This scenario tests a k8s pod status object with two containers. + Init container input: state=terminated, terminated.reason=Completed + Regular container input: stater=running, ready=false + """ + podstatus = PodStatus() + podstatus.add_init_container_status("terminated", "Completed") + podstatus.add_container_status("running", "", ready=True) + expected = ("Running", "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + + def test_missing_container_status(self): + """This scenario tests an empty k8s pod status object.""" + podstatus = PodStatus() + expected = (None, "", "") + actual = StatusData.determine_status_from_k8s(podstatus) + self.assertEqual(actual, expected) + class TestStatusDataUtilities(unittest.TestCase): """Verifies the app status utility methods.""" From 67f62b996789f6848a219dbc0cb3848629d92272 Mon Sep 17 00:00:00 2001 From: alfredeen Date: Fri, 23 Aug 2024 08:43:21 +0200 Subject: [PATCH 10/14] Added some delays between pod events to simulate more realistic behaviour --- tests/test_status_data.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/test_status_data.py b/tests/test_status_data.py index b9ad37a..e9acb21 100644 --- a/tests/test_status_data.py +++ b/tests/test_status_data.py @@ -1,3 +1,4 @@ +import time import unittest from serve_event_listener.status_data import StatusData @@ -74,14 +75,20 @@ def test_replica_scenario(self): self.new_pod.create(release) self.status_data.update({"object": self.new_pod}) + time.sleep(0.01) + self.pod.delete() self.status_data.update({"object": self.pod}) + time.sleep(0.01) + self.new_pod.running() self.status_data.update({"object": self.new_pod}) self.assertEqual(self.status_data.status_data[release].get("status"), "Running") + time.sleep(0.01) + self.new_pod.delete() self.status_data.update({"object": self.new_pod}) @@ -103,6 +110,8 @@ def test_valid_and_invalid_image_edits(self): assert self.status_data.status_data[release].get("status") == "Created" + time.sleep(0.01) + self.pod.running() self.status_data.update({"object": self.pod}) assert self.status_data.status_data[release].get("status") == "Running" @@ -110,6 +119,9 @@ def test_valid_and_invalid_image_edits(self): # Pod: invalid_pod self.invalid_pod = Pod() self.invalid_pod.create(release) + + time.sleep(0.01) + self.invalid_pod.error_image_pull() self.status_data.update({"object": self.invalid_pod}) assert self.status_data.status_data[release].get("status") == "Image Error" @@ -119,6 +131,9 @@ def test_valid_and_invalid_image_edits(self): # Pod: valid_pod self.valid_pod = Pod() self.valid_pod.create(release) + + time.sleep(0.01) + self.valid_pod.running() self.status_data.update({"object": self.valid_pod}) assert self.status_data.status_data[release].get("status") == "Running" From af3d0d3698941d2f427908d5972a5b5dff2bdeb3 Mon Sep 17 00:00:00 2001 From: alfredeen Date: Fri, 23 Aug 2024 09:17:27 +0200 Subject: [PATCH 11/14] Limited python version to 3.12 --- .github/workflows/ci.yaml | 2 +- .github/workflows/publish.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 1be4f79..79629ab 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -40,7 +40,7 @@ jobs: strategy: matrix: - python-version: [3.9, 3.11, 3.12] + python-version: [3.12] steps: - name: Checkout code diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index e1f4dd0..ad89ca8 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -27,7 +27,7 @@ jobs: strategy: matrix: - python-version: [3.8, 3.9, 3.11, 3.12] + python-version: [3.12] steps: - name: Checkout code From 20c8d09ed2adc1f96b89cec2d33dd5a8c251f6f0 Mon Sep 17 00:00:00 2001 From: alfredeen Date: Fri, 23 Aug 2024 11:39:01 +0200 Subject: [PATCH 12/14] Added method to fetch status directly from k8s to be used at strategic events --- serve_event_listener/event_listener.py | 15 +++--- serve_event_listener/status_data.py | 69 +++++++++++++++++++++++++- serve_event_listener/status_queue.py | 1 + 3 files changed, 77 insertions(+), 8 deletions(-) diff --git a/serve_event_listener/event_listener.py b/serve_event_listener/event_listener.py index 60c7c00..5bfc264 100644 --- a/serve_event_listener/event_listener.py +++ b/serve_event_listener/event_listener.py @@ -77,10 +77,11 @@ def setup(self, **kwargs: Optional[Any]) -> None: "\n\n\t{}\n\t Running Setup Process \n\t{}\n".format("#" * 30, "#" * 30) ) try: - self.check_status() + self.check_serve_api_status() self.setup_client() self.token = self.fetch_token() self._status_data = StatusData() + self._status_data.set_k8s_api_client(self.client, self.namespace) self._status_queue = StatusQueue(self.post, self.token) self.setup_complete = True except Exception as e: @@ -149,7 +150,7 @@ def listen(self) -> None: else: logger.warning("Setup not completed - run .setup() first") - def check_status(self) -> bool: + def check_serve_api_status(self) -> bool: """ Checks the status of the Serve API. @@ -193,7 +194,7 @@ def setup_client(self) -> None: self.watch = watch.Watch() def list_all_pods(self): - logger.info("Listing all pods and status codes") + logger.info("Listing all pods and their status codes") try: api_response = self.client.list_namespaced_pod( @@ -201,9 +202,11 @@ def list_all_pods(self): ) for pod in api_response.items: - # TODO get_status() similar to logic in StatusData - # Make this a helper function and unit test? - print(f"{pod.metadata.name} with status {pod.status}") + release = pod.metadata.labels.get("release") + app_status = StatusData.determine_status_from_k8s(pod.status) + logger.info( + f"Release={release}, {pod.metadata.name} with status {app_status}" + ) except ApiException as e: logger.warning( f"Exception when calling CoreV1Api->list_namespaced_pod. {e}" diff --git a/serve_event_listener/status_data.py b/serve_event_listener/status_data.py index 10b350d..f2a2d53 100644 --- a/serve_event_listener/status_data.py +++ b/serve_event_listener/status_data.py @@ -4,6 +4,8 @@ from typing import Any, Dict, Tuple, Union import requests +from kubernetes import client +from kubernetes.client.exceptions import ApiException from kubernetes.client.models import V1PodStatus logger = logging.getLogger(__name__) @@ -26,6 +28,8 @@ class StatusData: def __init__(self): self.status_data = {} + self.k8s_api_client = None + self.namespace = "default" @staticmethod def determine_status_from_k8s(status_object: V1PodStatus) -> Tuple[str, str, str]: @@ -114,6 +118,57 @@ def get_timestamp_as_str() -> str: ) return current_utc_time + def set_k8s_api_client(self, k8s_api_client: client.CoreV1Api, namespace: str): + self.k8s_api_client = k8s_api_client + self.namespace = namespace + + def fetch_status_from_k8s_api(self, release: str) -> Tuple[str, str, str]: + """ + Get the actual status of a release from k8s via the client API. + Because this can be as costly operation it is only used at critical times such as deleted pods. + + Returns: + - A tuple consisting of status, pod_message, container_message + """ + logger.debug( + f"Getting the status of release {release} directly from k8s via the api client" + ) + + status = "Unset" + pod_message = container_message = "" + + try: + api_response = self.k8s_api_client.list_namespaced_pod( + self.namespace, limit=500, timeout_seconds=120, watch=False + ) + + for pod in api_response.items: + if pod.metadata.labels.get("release") == release: + pod_status = StatusData.determine_status_from_k8s(pod.status) + if status == "Unset": + status = pod_status + logger.debug( + f"Preliminary status of release {release} set from Unset to {status}" + ) + elif status == "Deleted": + # All other statuses override Deleted + status = pod_status + logger.debug( + f"Preliminary status of release {release} set from Deleted to {status}" + ) + elif pod_status == "Running": + # Running overrides all other statuses + status = pod_status + logger.debug( + f"Preliminary status of release {release} set to {status}" + ) + except ApiException as e: + logger.warning( + f"Exception when calling CoreV1Api->list_namespaced_pod. {e}" + ) + + return status, pod_message, container_message + def update(self, event: dict) -> None: """ Process a Kubernetes pod event and update the status_data. @@ -122,7 +177,7 @@ def update(self, event: dict) -> None: - event (dict): The Kubernetes pod event. - status_data (dict): Dictionary containing status info. - Returns: + Sets: - status_data (dict): Updated dictionary containing status info. - release (str): The release of the updated status """ @@ -218,10 +273,20 @@ def update_or_create_status( release not in status_data or creation_timestamp >= status_data[release]["creation_timestamp"] ): + + status = "Deleted" if deletion_timestamp else status + + if status == "Deleted": + # Status Deleted is a destructive action + # Therefore we double-check the k8s status directly upon detecting this + status = self.fetch_status_from_k8s_api(release) + if status != "Deleted": + deletion_timestamp = None + status_data[release] = { "creation_timestamp": creation_timestamp, "deletion_timestamp": deletion_timestamp, - "status": "Deleted" if deletion_timestamp else status, + "status": status, "event-ts": StatusData.get_timestamp_as_str(), "sent": False, } diff --git a/serve_event_listener/status_queue.py b/serve_event_listener/status_queue.py index ee77a05..2953df6 100644 --- a/serve_event_listener/status_queue.py +++ b/serve_event_listener/status_queue.py @@ -31,6 +31,7 @@ def process(self): release = status_data["release"] new_status = status_data["new-status"] if new_status == "Deleted": + # TODO: Be careful with deleting releases! Check the truth in k8s. logger.info( f"Processing release: {release}. New status is Deleted!" ) From 1a60ab6133832ad16cae5ebb0bd69ab1b96bc724 Mon Sep 17 00:00:00 2001 From: alfredeen Date: Fri, 23 Aug 2024 12:46:11 +0200 Subject: [PATCH 13/14] Fixed return of tuple methods. Releases with no pods are now set to status Deleted. --- serve_event_listener/status_data.py | 36 ++++++++++++++++++---------- serve_event_listener/status_queue.py | 2 +- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/serve_event_listener/status_data.py b/serve_event_listener/status_data.py index f2a2d53..1e44c2d 100644 --- a/serve_event_listener/status_data.py +++ b/serve_event_listener/status_data.py @@ -46,7 +46,7 @@ def determine_status_from_k8s(status_object: V1PodStatus) -> Tuple[str, str, str - status_object (dict): The Kubernetes status object. Returns: - - str: The status of the pod. + - Tuple[str, str, str]: The status of the pod, container message, pod message """ empty_message = "" pod_message = status_object.message if status_object.message else empty_message @@ -128,14 +128,16 @@ def fetch_status_from_k8s_api(self, release: str) -> Tuple[str, str, str]: Because this can be as costly operation it is only used at critical times such as deleted pods. Returns: - - A tuple consisting of status, pod_message, container_message + - Tuple[str, str, str]: The status of the pod, container message, pod message + + If no pod matches the release, then return None, "", "" """ logger.debug( f"Getting the status of release {release} directly from k8s via the api client" ) - status = "Unset" - pod_message = container_message = "" + status = None + container_message = pod_message = "" try: api_response = self.k8s_api_client.list_namespaced_pod( @@ -144,11 +146,13 @@ def fetch_status_from_k8s_api(self, release: str) -> Tuple[str, str, str]: for pod in api_response.items: if pod.metadata.labels.get("release") == release: - pod_status = StatusData.determine_status_from_k8s(pod.status) - if status == "Unset": + pod_status, container_message, pod_message = ( + StatusData.determine_status_from_k8s(pod.status) + ) + if status is None: status = pod_status logger.debug( - f"Preliminary status of release {release} set from Unset to {status}" + f"Preliminary status of release {release} set from None to {status}" ) elif status == "Deleted": # All other statuses override Deleted @@ -167,7 +171,7 @@ def fetch_status_from_k8s_api(self, release: str) -> Tuple[str, str, str]: f"Exception when calling CoreV1Api->list_namespaced_pod. {e}" ) - return status, pod_message, container_message + return status, container_message, pod_message def update(self, event: dict) -> None: """ @@ -189,7 +193,7 @@ def update(self, event: dict) -> None: if pod: status_object = pod.status - status, pod_message, container_message = ( + status, container_message, pod_message = ( StatusData.determine_status_from_k8s(status_object) ) release = pod.metadata.labels.get("release") @@ -279,9 +283,17 @@ def update_or_create_status( if status == "Deleted": # Status Deleted is a destructive action # Therefore we double-check the k8s status directly upon detecting this - status = self.fetch_status_from_k8s_api(release) - if status != "Deleted": - deletion_timestamp = None + if self.k8s_api_client: + # Only use if the k8s client api has been set + # Unit tests for example do not currently set a k8s api + status, *_ = self.fetch_status_from_k8s_api(release) + + if status is None: + # No pod with this release found. Set status to Deleted + status = "Deleted" + + if status != "Deleted": + deletion_timestamp = None status_data[release] = { "creation_timestamp": creation_timestamp, diff --git a/serve_event_listener/status_queue.py b/serve_event_listener/status_queue.py index 2953df6..a2862db 100644 --- a/serve_event_listener/status_queue.py +++ b/serve_event_listener/status_queue.py @@ -30,8 +30,8 @@ def process(self): release = status_data["release"] new_status = status_data["new-status"] + if new_status == "Deleted": - # TODO: Be careful with deleting releases! Check the truth in k8s. logger.info( f"Processing release: {release}. New status is Deleted!" ) From 2fe018af4868440b044980b8318cac3f6ed579fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Alfred=C3=A9en?= Date: Fri, 23 Aug 2024 16:23:31 +0200 Subject: [PATCH 14/14] Update serve_event_listener/status_data.py Co-authored-by: Nikita Churikov <8545082+churnikov@users.noreply.github.com> --- serve_event_listener/status_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/serve_event_listener/status_data.py b/serve_event_listener/status_data.py index 1e44c2d..436543c 100644 --- a/serve_event_listener/status_data.py +++ b/serve_event_listener/status_data.py @@ -270,7 +270,7 @@ def update_or_create_status( logger.debug( f"Release {release}. Status data before update:{status_data}. \ - release in status data? {release not in status_data}. \ + {(release in status_data)=}? \ creation_timestamp={creation_timestamp}, deletion_timestamp={deletion_timestamp}" ) if (