diff --git a/apitally/client/client_asyncio.py b/apitally/client/client_asyncio.py index e86dca5..9012361 100644 --- a/apitally/client/client_asyncio.py +++ b/apitally/client/client_asyncio.py @@ -6,7 +6,7 @@ import time from contextlib import suppress from functools import partial -from typing import Any, AsyncIterator, Dict, Optional, Tuple, Union +from typing import Any, AsyncIterator, Dict, Optional, Union from uuid import UUID import backoff @@ -40,7 +40,7 @@ def __init__( self.proxy = proxy self._stop_sync_loop = False self._sync_loop_task: Optional[asyncio.Task] = None - self._sync_data_queue: asyncio.Queue[Tuple[float, Dict[str, Any]]] = asyncio.Queue() + self._sync_data_queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue() self._set_startup_data_task: Optional[asyncio.Task] = None def get_http_client(self) -> httpx.AsyncClient: @@ -103,20 +103,19 @@ async def send_startup_data(self, client: httpx.AsyncClient) -> None: async def send_sync_data(self, client: httpx.AsyncClient) -> None: data = self.get_sync_data() - self._sync_data_queue.put_nowait((time.time(), data)) + self._sync_data_queue.put_nowait(data) i = 0 while not self._sync_data_queue.empty(): - timestamp, data = self._sync_data_queue.get_nowait() + data = self._sync_data_queue.get_nowait() try: - if (time_offset := time.time() - timestamp) <= MAX_QUEUE_TIME: + if time.time() - data["timestamp"] <= MAX_QUEUE_TIME: if i > 0: await asyncio.sleep(random.uniform(0.1, 0.3)) - data["time_offset"] = time_offset await self._send_sync_data(client, data) i += 1 except httpx.HTTPError: - self._sync_data_queue.put_nowait((timestamp, data)) + self._sync_data_queue.put_nowait(data) break finally: self._sync_data_queue.task_done() diff --git a/apitally/client/client_base.py b/apitally/client/client_base.py index 1124e98..df3c22b 100644 --- a/apitally/client/client_base.py +++ b/apitally/client/client_base.py @@ -89,6 +89,7 @@ def add_uuids_to_data(self, data: Dict[str, Any]) -> Dict[str, Any]: def get_sync_data(self) -> Dict[str, Any]: data = { + "timestamp": time.time(), "requests": self.request_counter.get_and_reset_requests(), "validation_errors": self.validation_error_counter.get_and_reset_validation_errors(), "server_errors": self.server_error_counter.get_and_reset_server_errors(), diff --git a/apitally/client/client_threading.py b/apitally/client/client_threading.py index 66c2411..b76bfdc 100644 --- a/apitally/client/client_threading.py +++ b/apitally/client/client_threading.py @@ -8,7 +8,7 @@ from io import BufferedReader from queue import Queue from threading import Event, Thread -from typing import Any, Callable, Dict, Optional, Tuple +from typing import Any, Callable, Dict, Optional from uuid import UUID import backoff @@ -58,7 +58,7 @@ def __init__( self.proxies = {"https": proxy} if proxy else None self._thread: Optional[Thread] = None self._stop_sync_loop = Event() - self._sync_data_queue: Queue[Tuple[float, Dict[str, Any]]] = Queue() + self._sync_data_queue: Queue[Dict[str, Any]] = Queue() def start_sync_loop(self) -> None: self._stop_sync_loop.clear() @@ -114,20 +114,19 @@ def send_startup_data(self, session: requests.Session) -> None: def send_sync_data(self, session: requests.Session) -> None: data = self.get_sync_data() - self._sync_data_queue.put_nowait((time.time(), data)) + self._sync_data_queue.put_nowait(data) i = 0 while not self._sync_data_queue.empty(): - timestamp, data = self._sync_data_queue.get_nowait() + data = self._sync_data_queue.get_nowait() try: - if (time_offset := time.time() - timestamp) <= MAX_QUEUE_TIME: + if time.time() - data["timestamp"] <= MAX_QUEUE_TIME: if i > 0: time.sleep(random.uniform(0.1, 0.3)) - data["time_offset"] = time_offset self._send_sync_data(session, data) i += 1 except requests.RequestException: - self._sync_data_queue.put_nowait((timestamp, data)) + self._sync_data_queue.put_nowait(data) break finally: self._sync_data_queue.task_done()