Skip to content

Commit

Permalink
Merge pull request #4 from rejiren/WGTE20-10
Browse files Browse the repository at this point in the history
WGTE20-10 [Monitoring] Async push metrics for jobs
  • Loading branch information
dburakov authored Aug 14, 2018
2 parents a7e80a1 + dbfbda0 commit 2dd12ff
Showing 1 changed file with 41 additions and 1 deletion.
42 changes: 41 additions & 1 deletion aiohttp_prometheus_monitoring/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
# -*- coding: utf-8 -*-

import asyncio
import logging
import traceback
from importlib import import_module
from concurrent.futures import ThreadPoolExecutor

from prometheus_client import Gauge
from prometheus_client import Gauge, push_to_gateway, REGISTRY
from prometheus_async.aio.web import server_stats

from aiohttp_prometheus_monitoring.utils import PeriodicTask
from aiohttp_prometheus_monitoring.exceptions import WrongConfiguration
from aiohttp_prometheus_monitoring.views import ping


logger = logging.getLogger()


def import_by_path(dotted_path):
try:
module_path, object_name = dotted_path.rsplit('.', 1)
Expand Down Expand Up @@ -64,3 +71,36 @@ async def setup_monitoring(config, app=None):
app.monitoring_tasks = await init_metrics(config)
app.on_shutdown.append(stop_monitoring_tasks)
init_monitoring_routes(config, app)


def _push_metrics(job_name, url, grouping_key=None, registry=REGISTRY):
try:
push_to_gateway(url, job=job_name, registry=registry, grouping_key=grouping_key)
except Exception:
logger.exception(traceback.format_exc())
else:
logger.debug('Monitoring metrics pushed for job {}.'.format(job_name))


async def coro(job_name, url, grouping_key):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=2) as executor:
await loop.run_in_executor(executor, _push_metrics, job_name, url, grouping_key)


async def periodic_coro(sleep_time, job_name, url, grouping_key):
loop = asyncio.get_event_loop()
while True:
with ThreadPoolExecutor(max_workers=2) as executor:
await loop.run_in_executor(executor, _push_metrics, job_name, url, grouping_key)
await asyncio.sleep(sleep_time)


def push_metrics(job_name, url, grouping_key=None):
return asyncio.ensure_future(coro(job_name, url, grouping_key))


def periodic_push_metrics(sleep_time, job_name, url, grouping_key=None):
return asyncio.ensure_future(periodic_coro(sleep_time, job_name, url, grouping_key))


0 comments on commit 2dd12ff

Please sign in to comment.