From 914c3e3f6193907214304db234b98e5845b354a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20=22decko=22=20de=20Brito?= Date: Mon, 8 Apr 2024 19:26:27 -0300 Subject: [PATCH] Adds two new metrics for the Task system. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #3821 Co-authored-by: Matthias Dellweg <2500@gmx.de> Co-authored-by: Ľuboš Mjachky Co-authored-by: Grant Gainey Co-authored-by: Ina Panova --- .github/workflows/scripts/install.sh | 2 +- CHANGES/3821.feature | 2 + pulpcore/constants.py | 1 + pulpcore/tasking/worker.py | 80 +++++++++++++++++-- pulpcore/tests/functional/__init__.py | 31 +++++++ pulpcore/tests/functional/api/test_tasking.py | 46 +++++++++++ .../tests/functional/assets/otel_server.py | 54 ++++++++++++- requirements.txt | 8 +- template_config.yml | 1 + 9 files changed, 211 insertions(+), 14 deletions(-) create mode 100644 CHANGES/3821.feature diff --git a/.github/workflows/scripts/install.sh b/.github/workflows/scripts/install.sh index 43de7e195c..b20d5eb7be 100755 --- a/.github/workflows/scripts/install.sh +++ b/.github/workflows/scripts/install.sh @@ -127,7 +127,7 @@ if [ "$TEST" = "azure" ]; then command: "azurite-blob --blobHost 0.0.0.0 --cert /etc/pulp/azcert.pem --key /etc/pulp/azkey.pem"' vars/main.yaml sed -i -e '$a azure_test: true\ pulp_scenario_settings: {"domain_enabled": true}\ -pulp_scenario_env: {"otel_bsp_max_export_batch_size": 1, "otel_bsp_max_queue_size": 1, "otel_exporter_otlp_endpoint": "http://localhost:4318", "otel_exporter_otlp_protocol": "http/protobuf", "pulp_otel_enabled": "true"}\ +pulp_scenario_env: {"otel_bsp_max_export_batch_size": 1, "otel_bsp_max_queue_size": 1, "otel_exporter_otlp_endpoint": "http://localhost:4318", "otel_exporter_otlp_protocol": "http/protobuf", "otel_metric_export_interval": 800, "pulp_otel_enabled": "true"}\ ' vars/main.yaml fi diff --git a/CHANGES/3821.feature b/CHANGES/3821.feature new file mode 100644 index 0000000000..079580bb8f --- /dev/null +++ b/CHANGES/3821.feature @@ -0,0 +1,2 @@ +Added two new metrics related to Tasks: `pulp_tasks_unblocked_waiting_queue` has the number of unblocked waiting tasks that have been waiting longer than five(5) seconds, +while `pulp_tasks_longest_unblocked_waiting_time` record the time in seconds of the longest waiting time for a task in the queue. diff --git a/pulpcore/constants.py b/pulpcore/constants.py index 30759e6696..a6852a3373 100644 --- a/pulpcore/constants.py +++ b/pulpcore/constants.py @@ -12,6 +12,7 @@ TASK_DISPATCH_LOCK = 21 TASK_SCHEDULING_LOCK = 42 TASK_UNBLOCKING_LOCK = 84 +TASK_METRICS_HEARTBEAT_LOCK = 74 #: All valid task states. diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index c22779ef5e..1af7e90083 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -7,13 +7,15 @@ import signal import socket import contextlib -from datetime import timedelta +from datetime import datetime, timedelta from multiprocessing import Process from tempfile import TemporaryDirectory from packaging.version import parse as parse_version +from opentelemetry.metrics import get_meter from django.conf import settings from django.db import connection +from django.db.models import Case, Count, F, Max, Value, When from django.utils import timezone from pulpcore.constants import ( @@ -21,6 +23,7 @@ TASK_INCOMPLETE_STATES, TASK_SCHEDULING_LOCK, TASK_UNBLOCKING_LOCK, + TASK_METRICS_HEARTBEAT_LOCK, ) from pulpcore.exceptions import AdvisoryLockError from pulpcore.app.apps import pulp_plugin_configs @@ -38,12 +41,18 @@ _logger = logging.getLogger(__name__) random.seed() +# The following four constants are current "best guesses". +# Unless/until we can provide reasonable ways to decide to change their values, +# they will live as constants instead of "proper" settings. + # Number of heartbeats for a task to finish on graceful worker shutdown (approx) TASK_GRACE_INTERVAL = 3 # Number of heartbeats between attempts to kill the subprocess (approx) TASK_KILL_INTERVAL = 1 # Number of heartbeats between cleaning up worker processes (approx) WORKER_CLEANUP_INTERVAL = 100 +# Threshold time in seconds of an unblocked task before we consider a queue stalled +THRESHOLD_UNBLOCKED_WAITING_TIME = 5 class PulpcoreWorker: @@ -55,7 +64,8 @@ def __init__(self): self.task = None self.name = f"{os.getpid()}@{socket.getfqdn()}" - self.heartbeat_period = settings.WORKER_TTL / 3 + self.heartbeat_period = timedelta(seconds=settings.WORKER_TTL / 3) + self.last_metric_heartbeat = timezone.now() self.versions = {app.label: app.version for app in pulp_plugin_configs()} self.cursor = connection.cursor() self.worker = self.handle_worker_heartbeat() @@ -64,6 +74,19 @@ def __init__(self): WORKER_CLEANUP_INTERVAL / 10, WORKER_CLEANUP_INTERVAL ) + meter = get_meter(__name__) + self.tasks_unblocked_queue_meter = meter.create_gauge( + name="tasks_unblocked_queue", + description="Number of unblocked tasks waiting in the queue.", + unit="tasks", + ) + + self.tasks_longest_unblocked_time_meter = meter.create_gauge( + name="tasks_longest_unblocked_time", + description="The age of the longest waiting task.", + unit="seconds", + ) + # Add a file descriptor to trigger select on signals self.sentinel, sentinel_w = os.pipe() os.set_blocking(self.sentinel, False) @@ -90,6 +113,8 @@ def _signal_handler(self, thesignal, frame): def _pg_notify_handler(self, notification): if notification.channel == "pulp_worker_wakeup": self.wakeup = True + elif notification.channel == "pulp_worker_metrics_heartbeat": + self.last_metric_heartbeat = datetime.fromisoformat(notification.payload) elif self.task and notification.channel == "pulp_worker_cancel": if notification.payload == str(self.task.pk): self.cancel_task = True @@ -140,7 +165,7 @@ def worker_cleanup(self): qs.delete() def beat(self): - if self.worker.last_heartbeat < timezone.now() - timedelta(seconds=self.heartbeat_period): + if self.worker.last_heartbeat < timezone.now() - self.heartbeat_period: self.worker = self.handle_worker_heartbeat() if self.task_grace_timeout > 0: self.task_grace_timeout -= 1 @@ -150,6 +175,7 @@ def beat(self): self.worker_cleanup() with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(TASK_SCHEDULING_LOCK): dispatch_scheduled_tasks() + self.record_unblocked_waiting_tasks_metric() def notify_workers(self): self.cursor.execute("NOTIFY pulp_worker_wakeup") @@ -223,7 +249,7 @@ def identify_unblocked_tasks(self): _logger.debug("Marking canceling task %s unblocked.", task.pk) task.unblock() changed = True - # Don't consider this tasks reosurces as held. + # Don't consider this task's resources as held. continue elif ( @@ -244,6 +270,7 @@ def identify_unblocked_tasks(self): # Record the resources of the pending task taken_exclusive_resources.update(exclusive_resources) taken_shared_resources.update(shared_resources) + return changed def iter_tasks(self): @@ -293,7 +320,7 @@ def sleep(self): _logger.debug(_("Worker %s entering sleep state."), self.name) while not self.shutdown_requested and not self.wakeup: r, w, x = select.select( - [self.sentinel, connection.connection], [], [], self.heartbeat_period + [self.sentinel, connection.connection], [], [], self.heartbeat_period.seconds ) self.beat() if connection.connection in r: @@ -329,7 +356,7 @@ def supervise_task(self, task): [self.sentinel, connection.connection, task_process.sentinel], [], [], - self.heartbeat_period, + self.heartbeat_period.seconds, ) self.beat() if connection.connection in r: @@ -392,6 +419,45 @@ def handle_available_tasks(self): keep_looping = True self.supervise_task(task) + def record_unblocked_waiting_tasks_metric(self): + if os.getenv("PULP_OTEL_ENABLED").lower() != "true": + return + + now = timezone.now() + if now > self.last_metric_heartbeat + self.heartbeat_period: + with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock( + TASK_METRICS_HEARTBEAT_LOCK + ): + # For performance reasons we aggregate these statistics on a single database call. + unblocked_tasks_stats = ( + Task.objects.filter(unblocked_at__isnull=False, started_at__isnull=True) + .annotate(unblocked_for=Value(timezone.now()) - F("unblocked_at")) + .aggregate( + longest_unblocked_waiting_time=Max( + "unblocked_for", default=timezone.timedelta(0) + ), + unblocked_tasks_count_gte_threshold=Count( + Case( + When( + unblocked_for__gte=Value( + timezone.timedelta(seconds=THRESHOLD_UNBLOCKED_WAITING_TIME) + ), + then=1, + ) + ) + ), + ) + ) + + self.tasks_unblocked_queue_meter.set( + unblocked_tasks_stats["unblocked_tasks_count_gte_threshold"] + ) + self.tasks_longest_unblocked_time_meter.set( + unblocked_tasks_stats["longest_unblocked_waiting_time"].seconds + ) + + self.cursor.execute(f"NOTIFY pulp_worker_metrics_heartbeat, '{str(now)}'") + def run(self, burst=False): with WorkerDirectory(self.name): signal.signal(signal.SIGINT, self._signal_handler) @@ -400,6 +466,7 @@ def run(self, burst=False): # Subscribe to pgsql channels connection.connection.add_notify_handler(self._pg_notify_handler) self.cursor.execute("LISTEN pulp_worker_cancel") + self.cursor.execute("LISTEN pulp_worker_metrics_heartbeat") if burst: self.handle_available_tasks() else: @@ -412,5 +479,6 @@ def run(self, burst=False): break self.sleep() self.cursor.execute("UNLISTEN pulp_worker_wakeup") + self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat") self.cursor.execute("UNLISTEN pulp_worker_cancel") self.shutdown() diff --git a/pulpcore/tests/functional/__init__.py b/pulpcore/tests/functional/__init__.py index 1dad7401d2..6a92729e9a 100644 --- a/pulpcore/tests/functional/__init__.py +++ b/pulpcore/tests/functional/__init__.py @@ -568,6 +568,37 @@ async def _send_request(): return _received_otel_span +@pytest.fixture(scope="session") +def received_otel_metrics(): + """A fixture for checking the presence of specific metrics on the otel collector server. + + Ensure the collector server is up and running before executing tests with this fixture. To do + so, please, run the server as follows: python3 pulpcore/tests/functional/assets/otel_server.py + """ + + def _received_otel_metric(data, retries=3): + if os.environ.get("PULP_OTEL_ENABLED") != "true": + # pretend everything is working as expected if tests are run from + # a non-configured runner + return True + + async def _send_request(): + async with aiohttp.ClientSession(raise_for_status=False) as session: + otel_server_url = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + async with session.post(f"{otel_server_url}/metrics_test", json=data) as response: + return response.status + + while retries: + status = asyncio.run(_send_request()) + if status == 200: + return True + sleep(2) + retries -= 1 + return False + + return _received_otel_metric + + @pytest.fixture def test_path(): return os.getenv("PYTEST_CURRENT_TEST").split()[0] diff --git a/pulpcore/tests/functional/api/test_tasking.py b/pulpcore/tests/functional/api/test_tasking.py index 3a90f8ec5b..76cdaaa3aa 100644 --- a/pulpcore/tests/functional/api/test_tasking.py +++ b/pulpcore/tests/functional/api/test_tasking.py @@ -1,9 +1,11 @@ """Tests related to the tasking system.""" +import os import json import pytest import subprocess import time + from aiohttp import BasicAuth from urllib.parse import urljoin from uuid import uuid4 @@ -349,3 +351,47 @@ def test_task_version_prevent_pickup(dispatch_task, pulpcore_bindings): task = pulpcore_bindings.TasksApi.read(task_href) assert task.state == "waiting" pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"}) + + +def test_emmiting_unblocked_task_telemetry( + dispatch_task, pulpcore_bindings, pulp_settings, received_otel_metrics +): + if os.getenv("PULP_OTEL_ENABLED").lower() != "true": + pytest.skip("Need PULP_OTEL_ENABLED to run this test.") + + # Checking online workers ready to get a task + workers_online = pulpcore_bindings.WorkersApi.list(online="true").count + + # We need to generate long running tasks to block the workers from executing other tasks + resident_task_hrefs = [ + dispatch_task("pulpcore.app.tasks.test.sleep", args=(30,)) + for worker in range(workers_online) + ] + + # Then we dispatch a quick unblockable task just to keep it waiting in the queue + task_href = dispatch_task("pulpcore.app.tasks.test.sleep", args=(0,)) + + task = pulpcore_bindings.TasksApi.read(task_href) + assert task.state == "waiting" + + # And trigger the metrics + assert received_otel_metrics( + { + "name": "tasks_unblocked_queue", + "description": "Number of unblocked tasks waiting in the queue.", + "unit": "tasks", + } + ) + + assert received_otel_metrics( + { + "name": "tasks_longest_unblocked_time", + "description": "The age of the longest waiting task.", + "unit": "seconds", + } + ) + + [ + pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"}) + for task_href in resident_task_hrefs + ] diff --git a/pulpcore/tests/functional/assets/otel_server.py b/pulpcore/tests/functional/assets/otel_server.py index cd19ca3555..c9acf36a71 100644 --- a/pulpcore/tests/functional/assets/otel_server.py +++ b/pulpcore/tests/functional/assets/otel_server.py @@ -8,6 +8,9 @@ from aiohttp import web from opentelemetry.proto.trace.v1.trace_pb2 import TracesData +from opentelemetry.proto.metrics.v1.metrics_pb2 import MetricsData + +_logger = logging.getLogger(__name__) class ThreadedAiohttpServer(threading.Thread): @@ -47,12 +50,13 @@ def _otel_collector(): or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") != "http://localhost:4318" or os.environ.get("OTEL_EXPORTER_OTLP_PROTOCOL") != "http/protobuf" ): - logging.info("Telemetry was not configured. Exiting...") + _logger.info("Telemetry was not configured. Exiting...") sys.exit(0) else: - logging.info("Booting up the otel collector server...") + _logger.info("Booting up the otel collector server...") spans = [] + metrics = [] async def _null_handler(request): raise web.HTTPOk() @@ -70,6 +74,25 @@ async def _traces_handler(request): spans.append(attrs) raise web.HTTPOk() + async def _metrics_handler(request): + disabled_metrics = {"http.server.active_requests"} + + metrics_data = MetricsData() + metrics_data.ParseFromString(await request.read()) + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + if metric.name in disabled_metrics: + _logger.info("Dropping {} metric".format(metric.name)) + break + translated_metric = {} + translated_metric["name"] = metric.name + translated_metric["description"] = metric.description + translated_metric["unit"] = metric.unit + metrics.append(translated_metric) + _logger.info("Received a {} metric meter".format(translated_metric["name"])) + raise web.HTTPOk() + async def _test_handler(request): try: attrs = await request.json() @@ -85,12 +108,37 @@ async def _test_handler(request): else: raise web.HTTPNotFound() + async def _metrics_test_handler(request): + try: + attrs = await request.json() + except json.decoder.JSONDecodeError: + raise web.HTTPNotFound() + + matched_metric = next( + ( + metric + for metric in metrics + if all((metric.get(key) == value for key, value in attrs.items())) + ), + None, + ) + if matched_metric: + metrics.remove(matched_metric) + raise web.HTTPOk() + else: + raise web.HTTPNotFound() + + async def _read_handler(request): + return web.Response(text=json.dumps(metrics)) + app = web.Application() app.add_routes( [ - web.post("/v1/metrics", _null_handler), + web.post("/v1/metrics", _metrics_handler), web.post("/v1/traces", _traces_handler), web.post("/test", _test_handler), + web.post("/metrics_test", _metrics_test_handler), + web.get("/read", _read_handler), ] ) diff --git a/requirements.txt b/requirements.txt index 54da0e86ff..2525c2a379 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,10 +23,10 @@ jinja2>=3.1,<=3.1.4 json_stream>=2.3.2,<2.4 jq>=1.6.0,<1.8.0 PyOpenSSL<25.0 -opentelemetry-distro[otlp]>=0.38b0,<=0.45b0 -opentelemetry-exporter-otlp-proto-http>=1.17.0,<=1.24.0 -opentelemetry-instrumentation-django>=0.38b0,<=0.45b0 -opentelemetry-instrumentation-wsgi>=0.38b0,<=0.45b0 +opentelemetry-distro[otlp]>=0.45b0,<=0.45b0 +opentelemetry-exporter-otlp-proto-http>=1.24.0,<=1.24.0 +opentelemetry-instrumentation-django>=0.45b0,<=0.45b0 +opentelemetry-instrumentation-wsgi>=0.45b,<=0.45b0 protobuf>=4.21.1,<5.26.2 pulp-glue>=0.18.0,<0.26 pygtrie>=2.5,<=2.5.0 diff --git a/template_config.yml b/template_config.yml index 4824f34312..87c1547ae8 100644 --- a/template_config.yml +++ b/template_config.yml @@ -54,6 +54,7 @@ pulp_env_azure: otel_bsp_max_queue_size: 1 otel_exporter_otlp_endpoint: http://localhost:4318 otel_exporter_otlp_protocol: http/protobuf + otel_metric_export_interval: 800 pulp_otel_enabled: 'true' pulp_env_gcp: {} pulp_env_s3: {}