Skip to content

Commit

Permalink
Merge pull request #4 from Snawoot/asdnotify
Browse files Browse the repository at this point in the history
Asdnotify
  • Loading branch information
Snawoot authored Jan 26, 2020
2 parents 8cb23ad + 21b02d0 commit 634140c
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 7 deletions.
12 changes: 6 additions & 6 deletions rsp/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from functools import partial
import os.path

from sdnotify import SystemdNotifier
import asyncssh

from .asdnotify import AsyncSystemdNotifier
from .constants import LogLevel
from . import utils
from .ssh_pool import SSHPool
Expand Down Expand Up @@ -160,12 +160,12 @@ async def amain(args, loop): # pragma: no cover
sig_handler = partial(utils.exit_handler, exit_event)
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGINT, sig_handler)
notifier = await loop.run_in_executor(None, SystemdNotifier)
await loop.run_in_executor(None, notifier.notify, "READY=1")
await exit_event.wait()
async with AsyncSystemdNotifier() as notifier:
await notifier.notify(b"READY=1")
await exit_event.wait()

logger.debug("Eventloop interrupted. Shutting down server...")
await loop.run_in_executor(None, notifier.notify, "STOPPING=1")
logger.debug("Eventloop interrupted. Shutting down server...")
await notifier.notify(b"STOPPING=1")
beat.cancel()


Expand Down
73 changes: 73 additions & 0 deletions rsp/asdnotify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os
import socket
import asyncio

MAX_QLEN = 128

class AsyncSystemdNotifier:
def __init__(self):
env_var = os.getenv('NOTIFY_SOCKET')
self._addr = ('\0' + env_var[1:]
if env_var is not None and env_var.startswith('@')
else env_var)
self._sock = None
self._started = False
self._loop = None
self._queue = asyncio.Queue(MAX_QLEN)
self._monitor = False

@property
def started(self):
return self._started

def _drain(self):
while not self._queue.empty():
msg = self._queue.get_nowait()
self._queue.task_done()
try:
self._send(msg)
except BlockingIOError: # pragma: no cover
self._monitor = True
self._loop.add_writer(self._sock.fileno(), self._drain)
break
except OSError:
pass
else:
if self._monitor:
self._monitor = False
self._loop.remove_writer(self._sock.fileno())

def _send(self, data):
return self._sock.sendto(data, socket.MSG_NOSIGNAL, self._addr)

async def start(self):
if self._addr is None:
return False
self._loop = asyncio.get_event_loop()
try:
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
self._sock.setblocking(0)
self._started = True
except OSError:
return False
return True

async def notify(self, status):
if self._started:
await self._queue.put(status)
self._drain()

async def stop(self):
if self._started:
self._started = False
await self._queue.join()
if self._monitor:
self._loop.remove_writer(self._sock.fileno())
self._sock.close()

async def __aenter__(self):
await self.start()
return self

async def __aexit__(self, exc_type, exc, traceback):
await self.stop()
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
'wheel',
],
install_requires=[
'sdnotify>=0.3.2',
'asyncssh>=1.16.0',
],
extras_require={
Expand Down

0 comments on commit 634140c

Please sign in to comment.