From 21b02d0167fb3eddaeee26a03487d4fc63f5bfb7 Mon Sep 17 00:00:00 2001 From: Vladislav Yarmak Date: Sun, 26 Jan 2020 17:10:10 +0200 Subject: [PATCH] switch to bundled asdnotify --- rsp/__main__.py | 12 ++++---- rsp/asdnotify.py | 73 ++++++++++++++++++++++++++++++++++++++++++++++++ setup.py | 1 - 3 files changed, 79 insertions(+), 7 deletions(-) create mode 100644 rsp/asdnotify.py diff --git a/rsp/__main__.py b/rsp/__main__.py index 1676177..e92dc9c 100644 --- a/rsp/__main__.py +++ b/rsp/__main__.py @@ -8,9 +8,9 @@ from functools import partial import os.path -from sdnotify import SystemdNotifier import asyncssh +from .asdnotify import AsyncSystemdNotifier from .constants import LogLevel from . import utils from .ssh_pool import SSHPool @@ -160,12 +160,12 @@ async def amain(args, loop): # pragma: no cover sig_handler = partial(utils.exit_handler, exit_event) signal.signal(signal.SIGTERM, sig_handler) signal.signal(signal.SIGINT, sig_handler) - notifier = await loop.run_in_executor(None, SystemdNotifier) - await loop.run_in_executor(None, notifier.notify, "READY=1") - await exit_event.wait() + async with AsyncSystemdNotifier() as notifier: + await notifier.notify(b"READY=1") + await exit_event.wait() - logger.debug("Eventloop interrupted. Shutting down server...") - await loop.run_in_executor(None, notifier.notify, "STOPPING=1") + logger.debug("Eventloop interrupted. Shutting down server...") + await notifier.notify(b"STOPPING=1") beat.cancel() diff --git a/rsp/asdnotify.py b/rsp/asdnotify.py new file mode 100644 index 0000000..bbe47dd --- /dev/null +++ b/rsp/asdnotify.py @@ -0,0 +1,73 @@ +import os +import socket +import asyncio + +MAX_QLEN = 128 + +class AsyncSystemdNotifier: + def __init__(self): + env_var = os.getenv('NOTIFY_SOCKET') + self._addr = ('\0' + env_var[1:] + if env_var is not None and env_var.startswith('@') + else env_var) + self._sock = None + self._started = False + self._loop = None + self._queue = asyncio.Queue(MAX_QLEN) + self._monitor = False + + @property + def started(self): + return self._started + + def _drain(self): + while not self._queue.empty(): + msg = self._queue.get_nowait() + self._queue.task_done() + try: + self._send(msg) + except BlockingIOError: # pragma: no cover + self._monitor = True + self._loop.add_writer(self._sock.fileno(), self._drain) + break + except OSError: + pass + else: + if self._monitor: + self._monitor = False + self._loop.remove_writer(self._sock.fileno()) + + def _send(self, data): + return self._sock.sendto(data, socket.MSG_NOSIGNAL, self._addr) + + async def start(self): + if self._addr is None: + return False + self._loop = asyncio.get_event_loop() + try: + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self._sock.setblocking(0) + self._started = True + except OSError: + return False + return True + + async def notify(self, status): + if self._started: + await self._queue.put(status) + self._drain() + + async def stop(self): + if self._started: + self._started = False + await self._queue.join() + if self._monitor: + self._loop.remove_writer(self._sock.fileno()) + self._sock.close() + + async def __aenter__(self): + await self.start() + return self + + async def __aexit__(self, exc_type, exc, traceback): + await self.stop() diff --git a/setup.py b/setup.py index b547781..09ba84f 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,6 @@ 'wheel', ], install_requires=[ - 'sdnotify>=0.3.2', 'asyncssh>=1.16.0', ], extras_require={