diff --git a/aiohttp_prometheus_monitoring/__init__.py b/aiohttp_prometheus_monitoring/__init__.py index 27b19fc..8783346 100644 --- a/aiohttp_prometheus_monitoring/__init__.py +++ b/aiohttp_prometheus_monitoring/__init__.py @@ -1,8 +1,12 @@ # -*- coding: utf-8 -*- +import asyncio +import logging +import traceback from importlib import import_module +from concurrent.futures import ThreadPoolExecutor -from prometheus_client import Gauge +from prometheus_client import Gauge, push_to_gateway, REGISTRY from prometheus_async.aio.web import server_stats from aiohttp_prometheus_monitoring.utils import PeriodicTask @@ -10,6 +14,9 @@ from aiohttp_prometheus_monitoring.views import ping +logger = logging.getLogger() + + def import_by_path(dotted_path): try: module_path, object_name = dotted_path.rsplit('.', 1) @@ -64,3 +71,36 @@ async def setup_monitoring(config, app=None): app.monitoring_tasks = await init_metrics(config) app.on_shutdown.append(stop_monitoring_tasks) init_monitoring_routes(config, app) + + +def _push_metrics(job_name, url, grouping_key=None, registry=REGISTRY): + try: + push_to_gateway(url, job=job_name, registry=registry, grouping_key=grouping_key) + except Exception: + logger.exception(traceback.format_exc()) + else: + logger.debug('Monitoring metrics pushed for job {}.'.format(job_name)) + + +async def coro(job_name, url, grouping_key): + loop = asyncio.get_event_loop() + with ThreadPoolExecutor(max_workers=2) as executor: + await loop.run_in_executor(executor, _push_metrics, job_name, url, grouping_key) + + +async def periodic_coro(sleep_time, job_name, url, grouping_key): + loop = asyncio.get_event_loop() + while True: + with ThreadPoolExecutor(max_workers=2) as executor: + await loop.run_in_executor(executor, _push_metrics, job_name, url, grouping_key) + await asyncio.sleep(sleep_time) + + +def push_metrics(job_name, url, grouping_key=None): + return asyncio.ensure_future(coro(job_name, url, grouping_key)) + + +def periodic_push_metrics(sleep_time, job_name, url, grouping_key=None): + return asyncio.ensure_future(periodic_coro(sleep_time, job_name, url, grouping_key)) + +