Skip to content

Commit

Permalink
Adds two new metrics for the Task system.
Browse files Browse the repository at this point in the history
Closes #3821

Co-authored-by: Matthias Dellweg <[email protected]>
Co-authored-by: Ľuboš Mjachky <[email protected]>
Co-authored-by: Grant Gainey <[email protected]>
Co-authored-by: Ina Panova <[email protected]>
  • Loading branch information
5 people committed May 23, 2024
1 parent 10bd731 commit 914c3e3
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/scripts/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ if [ "$TEST" = "azure" ]; then
command: "azurite-blob --blobHost 0.0.0.0 --cert /etc/pulp/azcert.pem --key /etc/pulp/azkey.pem"' vars/main.yaml
sed -i -e '$a azure_test: true\
pulp_scenario_settings: {"domain_enabled": true}\
pulp_scenario_env: {"otel_bsp_max_export_batch_size": 1, "otel_bsp_max_queue_size": 1, "otel_exporter_otlp_endpoint": "http://localhost:4318", "otel_exporter_otlp_protocol": "http/protobuf", "pulp_otel_enabled": "true"}\
pulp_scenario_env: {"otel_bsp_max_export_batch_size": 1, "otel_bsp_max_queue_size": 1, "otel_exporter_otlp_endpoint": "http://localhost:4318", "otel_exporter_otlp_protocol": "http/protobuf", "otel_metric_export_interval": 800, "pulp_otel_enabled": "true"}\
' vars/main.yaml
fi

Expand Down
2 changes: 2 additions & 0 deletions CHANGES/3821.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added two new metrics related to Tasks: `pulp_tasks_unblocked_waiting_queue` has the number of unblocked waiting tasks that have been waiting longer than five(5) seconds,
while `pulp_tasks_longest_unblocked_waiting_time` record the time in seconds of the longest waiting time for a task in the queue.
1 change: 1 addition & 0 deletions pulpcore/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TASK_DISPATCH_LOCK = 21
TASK_SCHEDULING_LOCK = 42
TASK_UNBLOCKING_LOCK = 84
TASK_METRICS_HEARTBEAT_LOCK = 74


#: All valid task states.
Expand Down
80 changes: 74 additions & 6 deletions pulpcore/tasking/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@
import signal
import socket
import contextlib
from datetime import timedelta
from datetime import datetime, timedelta
from multiprocessing import Process
from tempfile import TemporaryDirectory
from packaging.version import parse as parse_version
from opentelemetry.metrics import get_meter

from django.conf import settings
from django.db import connection
from django.db.models import Case, Count, F, Max, Value, When
from django.utils import timezone

from pulpcore.constants import (
TASK_STATES,
TASK_INCOMPLETE_STATES,
TASK_SCHEDULING_LOCK,
TASK_UNBLOCKING_LOCK,
TASK_METRICS_HEARTBEAT_LOCK,
)
from pulpcore.exceptions import AdvisoryLockError
from pulpcore.app.apps import pulp_plugin_configs
Expand All @@ -38,12 +41,18 @@
_logger = logging.getLogger(__name__)
random.seed()

# The following four constants are current "best guesses".
# Unless/until we can provide reasonable ways to decide to change their values,
# they will live as constants instead of "proper" settings.

# Number of heartbeats for a task to finish on graceful worker shutdown (approx)
TASK_GRACE_INTERVAL = 3
# Number of heartbeats between attempts to kill the subprocess (approx)
TASK_KILL_INTERVAL = 1
# Number of heartbeats between cleaning up worker processes (approx)
WORKER_CLEANUP_INTERVAL = 100
# Threshold time in seconds of an unblocked task before we consider a queue stalled
THRESHOLD_UNBLOCKED_WAITING_TIME = 5


class PulpcoreWorker:
Expand All @@ -55,7 +64,8 @@ def __init__(self):

self.task = None
self.name = f"{os.getpid()}@{socket.getfqdn()}"
self.heartbeat_period = settings.WORKER_TTL / 3
self.heartbeat_period = timedelta(seconds=settings.WORKER_TTL / 3)
self.last_metric_heartbeat = timezone.now()
self.versions = {app.label: app.version for app in pulp_plugin_configs()}
self.cursor = connection.cursor()
self.worker = self.handle_worker_heartbeat()
Expand All @@ -64,6 +74,19 @@ def __init__(self):
WORKER_CLEANUP_INTERVAL / 10, WORKER_CLEANUP_INTERVAL
)

meter = get_meter(__name__)
self.tasks_unblocked_queue_meter = meter.create_gauge(
name="tasks_unblocked_queue",
description="Number of unblocked tasks waiting in the queue.",
unit="tasks",
)

self.tasks_longest_unblocked_time_meter = meter.create_gauge(
name="tasks_longest_unblocked_time",
description="The age of the longest waiting task.",
unit="seconds",
)

# Add a file descriptor to trigger select on signals
self.sentinel, sentinel_w = os.pipe()
os.set_blocking(self.sentinel, False)
Expand All @@ -90,6 +113,8 @@ def _signal_handler(self, thesignal, frame):
def _pg_notify_handler(self, notification):
if notification.channel == "pulp_worker_wakeup":
self.wakeup = True
elif notification.channel == "pulp_worker_metrics_heartbeat":
self.last_metric_heartbeat = datetime.fromisoformat(notification.payload)
elif self.task and notification.channel == "pulp_worker_cancel":
if notification.payload == str(self.task.pk):
self.cancel_task = True
Expand Down Expand Up @@ -140,7 +165,7 @@ def worker_cleanup(self):
qs.delete()

def beat(self):
if self.worker.last_heartbeat < timezone.now() - timedelta(seconds=self.heartbeat_period):
if self.worker.last_heartbeat < timezone.now() - self.heartbeat_period:
self.worker = self.handle_worker_heartbeat()
if self.task_grace_timeout > 0:
self.task_grace_timeout -= 1
Expand All @@ -150,6 +175,7 @@ def beat(self):
self.worker_cleanup()
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(TASK_SCHEDULING_LOCK):
dispatch_scheduled_tasks()
self.record_unblocked_waiting_tasks_metric()

def notify_workers(self):
self.cursor.execute("NOTIFY pulp_worker_wakeup")
Expand Down Expand Up @@ -223,7 +249,7 @@ def identify_unblocked_tasks(self):
_logger.debug("Marking canceling task %s unblocked.", task.pk)
task.unblock()
changed = True
# Don't consider this tasks reosurces as held.
# Don't consider this task's resources as held.
continue

elif (
Expand All @@ -244,6 +270,7 @@ def identify_unblocked_tasks(self):
# Record the resources of the pending task
taken_exclusive_resources.update(exclusive_resources)
taken_shared_resources.update(shared_resources)

return changed

def iter_tasks(self):
Expand Down Expand Up @@ -293,7 +320,7 @@ def sleep(self):
_logger.debug(_("Worker %s entering sleep state."), self.name)
while not self.shutdown_requested and not self.wakeup:
r, w, x = select.select(
[self.sentinel, connection.connection], [], [], self.heartbeat_period
[self.sentinel, connection.connection], [], [], self.heartbeat_period.seconds
)
self.beat()
if connection.connection in r:
Expand Down Expand Up @@ -329,7 +356,7 @@ def supervise_task(self, task):
[self.sentinel, connection.connection, task_process.sentinel],
[],
[],
self.heartbeat_period,
self.heartbeat_period.seconds,
)
self.beat()
if connection.connection in r:
Expand Down Expand Up @@ -392,6 +419,45 @@ def handle_available_tasks(self):
keep_looping = True
self.supervise_task(task)

def record_unblocked_waiting_tasks_metric(self):
if os.getenv("PULP_OTEL_ENABLED").lower() != "true":
return

now = timezone.now()
if now > self.last_metric_heartbeat + self.heartbeat_period:
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(
TASK_METRICS_HEARTBEAT_LOCK
):
# For performance reasons we aggregate these statistics on a single database call.
unblocked_tasks_stats = (
Task.objects.filter(unblocked_at__isnull=False, started_at__isnull=True)
.annotate(unblocked_for=Value(timezone.now()) - F("unblocked_at"))
.aggregate(
longest_unblocked_waiting_time=Max(
"unblocked_for", default=timezone.timedelta(0)
),
unblocked_tasks_count_gte_threshold=Count(
Case(
When(
unblocked_for__gte=Value(
timezone.timedelta(seconds=THRESHOLD_UNBLOCKED_WAITING_TIME)
),
then=1,
)
)
),
)
)

self.tasks_unblocked_queue_meter.set(
unblocked_tasks_stats["unblocked_tasks_count_gte_threshold"]
)
self.tasks_longest_unblocked_time_meter.set(
unblocked_tasks_stats["longest_unblocked_waiting_time"].seconds
)

self.cursor.execute(f"NOTIFY pulp_worker_metrics_heartbeat, '{str(now)}'")

def run(self, burst=False):
with WorkerDirectory(self.name):
signal.signal(signal.SIGINT, self._signal_handler)
Expand All @@ -400,6 +466,7 @@ def run(self, burst=False):
# Subscribe to pgsql channels
connection.connection.add_notify_handler(self._pg_notify_handler)
self.cursor.execute("LISTEN pulp_worker_cancel")
self.cursor.execute("LISTEN pulp_worker_metrics_heartbeat")
if burst:
self.handle_available_tasks()
else:
Expand All @@ -412,5 +479,6 @@ def run(self, burst=False):
break
self.sleep()
self.cursor.execute("UNLISTEN pulp_worker_wakeup")
self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat")
self.cursor.execute("UNLISTEN pulp_worker_cancel")
self.shutdown()
31 changes: 31 additions & 0 deletions pulpcore/tests/functional/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,37 @@ async def _send_request():
return _received_otel_span


@pytest.fixture(scope="session")
def received_otel_metrics():
"""A fixture for checking the presence of specific metrics on the otel collector server.
Ensure the collector server is up and running before executing tests with this fixture. To do
so, please, run the server as follows: python3 pulpcore/tests/functional/assets/otel_server.py
"""

def _received_otel_metric(data, retries=3):
if os.environ.get("PULP_OTEL_ENABLED") != "true":
# pretend everything is working as expected if tests are run from
# a non-configured runner
return True

async def _send_request():
async with aiohttp.ClientSession(raise_for_status=False) as session:
otel_server_url = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT")
async with session.post(f"{otel_server_url}/metrics_test", json=data) as response:
return response.status

while retries:
status = asyncio.run(_send_request())
if status == 200:
return True
sleep(2)
retries -= 1
return False

return _received_otel_metric


@pytest.fixture
def test_path():
return os.getenv("PYTEST_CURRENT_TEST").split()[0]
Expand Down
46 changes: 46 additions & 0 deletions pulpcore/tests/functional/api/test_tasking.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Tests related to the tasking system."""

import os
import json
import pytest
import subprocess
import time

from aiohttp import BasicAuth
from urllib.parse import urljoin
from uuid import uuid4
Expand Down Expand Up @@ -349,3 +351,47 @@ def test_task_version_prevent_pickup(dispatch_task, pulpcore_bindings):
task = pulpcore_bindings.TasksApi.read(task_href)
assert task.state == "waiting"
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"})


def test_emmiting_unblocked_task_telemetry(
dispatch_task, pulpcore_bindings, pulp_settings, received_otel_metrics
):
if os.getenv("PULP_OTEL_ENABLED").lower() != "true":
pytest.skip("Need PULP_OTEL_ENABLED to run this test.")

# Checking online workers ready to get a task
workers_online = pulpcore_bindings.WorkersApi.list(online="true").count

# We need to generate long running tasks to block the workers from executing other tasks
resident_task_hrefs = [
dispatch_task("pulpcore.app.tasks.test.sleep", args=(30,))
for worker in range(workers_online)
]

# Then we dispatch a quick unblockable task just to keep it waiting in the queue
task_href = dispatch_task("pulpcore.app.tasks.test.sleep", args=(0,))

task = pulpcore_bindings.TasksApi.read(task_href)
assert task.state == "waiting"

# And trigger the metrics
assert received_otel_metrics(
{
"name": "tasks_unblocked_queue",
"description": "Number of unblocked tasks waiting in the queue.",
"unit": "tasks",
}
)

assert received_otel_metrics(
{
"name": "tasks_longest_unblocked_time",
"description": "The age of the longest waiting task.",
"unit": "seconds",
}
)

[
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"})
for task_href in resident_task_hrefs
]
Loading

0 comments on commit 914c3e3

Please sign in to comment.