Skip to content

Commit

Permalink
multiprocess scheduling & a lot of bugfixes (#88)
Browse files Browse the repository at this point in the history
Changes:

- add multiprocessing safe mode
- add locking helpers
- improve handling of multiple threads
- add non-blocking processing lock (for wakeups) to distribute between threads
- improved tests
- bugfixes
- updated store docs
  • Loading branch information
devkral authored Dec 31, 2024
1 parent ce18828 commit 1cb909f
Show file tree
Hide file tree
Showing 23 changed files with 605 additions and 123 deletions.
2 changes: 1 addition & 1 deletion asyncz/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.12.0"
__version__ = "0.13.0"
116 changes: 116 additions & 0 deletions asyncz/file_locking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
Portable file locking utilities.
Based partially on an example by Jonathan Feignberg in the Python
Cookbook [1] (licensed under the Python Software License) and a ctypes port by
Anatoly Techtonik for Roundup [2] (license [3]).
[1] https://code.activestate.com/recipes/65203/
[2] https://sourceforge.net/p/roundup/code/ci/default/tree/roundup/backends/portalocker.py # NOQA
[3] https://sourceforge.net/p/roundup/code/ci/default/tree/COPYING.txt
Example Usage::
>>> asyncz.file_locking import locks
>>> with open('./file', 'wb') as f:
... locks.lock(f, locks.LOCK_EX)
... f.write('Edgy')
"""

import os
from typing import IO, Any

__all__ = ("LOCK_EX", "LOCK_SH", "LOCK_NB", "lock", "unlock")


def _fd(f: IO) -> Any:
"""Get a filedescriptor from something which could be a file or an fd."""
return f.fileno() if hasattr(f, "fileno") else f


if os.name == "nt":
import msvcrt
from ctypes import POINTER, Structure, Union, WinDLL, byref, c_int64, c_ulong, c_void_p, sizeof
from ctypes.wintypes import BOOL, DWORD, HANDLE

LOCK_SH = 0 # the default
LOCK_NB = 0x1 # LOCKFILE_FAIL_IMMEDIATELY
LOCK_EX = 0x2 # LOCKFILE_EXCLUSIVE_LOCK

# --- Adapted from the pyserial project ---
# detect size of ULONG_PTR
ULONG_PTR = c_int64 if sizeof(c_ulong) != sizeof(c_void_p) else c_ulong
PVOID = c_void_p

# --- Union inside Structure by stackoverflow:3480240 ---
class _OFFSET(Structure):
_fields_ = [("Offset", DWORD), ("OffsetHigh", DWORD)]

class _OFFSET_UNION(Union):
_anonymous_ = ["_offset"]
_fields_ = [("_offset", _OFFSET), ("Pointer", PVOID)]

class OVERLAPPED(Structure):
_anonymous_ = ["_offset_union"]
_fields_ = [
("Internal", ULONG_PTR),
("InternalHigh", ULONG_PTR),
("_offset_union", _OFFSET_UNION),
("hEvent", HANDLE),
]

LPOVERLAPPED = POINTER(OVERLAPPED)

# --- Define function prototypes for extra safety ---
kernel32 = WinDLL("kernel32")
LockFileEx = kernel32.LockFileEx
LockFileEx.restype = BOOL
LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED]
UnlockFileEx = kernel32.UnlockFileEx
UnlockFileEx.restype = BOOL
UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED]

def lock(f: IO, flags: int) -> bool:
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
ret = LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped))
return bool(ret)

def unlock(f: IO) -> bool:
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
ret = UnlockFileEx(hfile, 0, 0, 0xFFFF0000, byref(overlapped))
return bool(ret)

else:
try:
import fcntl

LOCK_SH = fcntl.LOCK_SH # shared lock
LOCK_NB = fcntl.LOCK_NB # non-blocking
LOCK_EX = fcntl.LOCK_EX
except (ImportError, AttributeError):
# File locking is not supported.
LOCK_EX = LOCK_SH = LOCK_NB = 0

# Dummy functions that don't do anything.
def lock(f: IO, flags: int) -> bool:
# File is not locked
return False

def unlock(f: IO) -> bool:
# File is unlocked
return True

else:

def lock(f: IO, flags: int) -> bool:
try:
fcntl.flock(_fd(f), flags)
return True
except BlockingIOError:
return False

def unlock(f: IO) -> bool:
fcntl.flock(_fd(f), fcntl.LOCK_UN)
return True
77 changes: 77 additions & 0 deletions asyncz/locks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from __future__ import annotations

import os
from collections.abc import Generator
from contextlib import contextmanager, suppress
from threading import Lock, RLock
from typing import Any, Optional

from .file_locking import LOCK_EX, LOCK_NB, lock, unlock
from .protocols import LockProtectedProtocol


class NullLockProtected(LockProtectedProtocol):
@contextmanager
def protected(
self, blocking: bool = False, timeout: Optional[int] = None
) -> Generator[bool, None, None]:
yield True

def shutdown(self) -> None:
pass


class LockProtected(LockProtectedProtocol):
def __init__(self) -> None:
self.lock = self.create_lock()

def create_lock(self) -> Lock | RLock:
return Lock()

def shutdown(self) -> None:
pass

@contextmanager
def protected(
self, blocking: bool = False, timeout: Optional[int] = None
) -> Generator[bool, None, None]:
locked = self.lock.acquire(blocking, -1 if timeout is None else timeout)
yield locked
if locked:
self.lock.release()


class RLockProtected(LockProtected):
def create_lock(self) -> Lock | RLock:
return RLock()


class FileLockProtected(LockProtectedProtocol):
file_path: str

def __init__(self, file_path: str) -> None:
kwargs: Any = {}
if r"{pgrp" in file_path:
kwargs["pgrp"] = os.getpgrp()
if r"{ppid" in file_path:
kwargs["ppid"] = os.getppid()
self.file_path = file_path.format(**kwargs)

@contextmanager
def protected(
self, blocking: bool = False, timeout: Optional[int] = None
) -> Generator[bool, None, None]:
with open(self.file_path, "w+") as file:
flags = LOCK_EX
if not blocking:
flags |= LOCK_NB
locked = lock(file, flags)
try:
yield locked
finally:
if locked:
unlock(file)

def shutdown(self) -> None:
with suppress(FileNotFoundError):
os.remove(self.file_path)
14 changes: 14 additions & 0 deletions asyncz/protocols.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from __future__ import annotations

from contextlib import AbstractContextManager
from typing import Optional, Protocol


class LockProtectedProtocol(Protocol):
"""Non-blocking locks"""

def protected(
self, blocking: bool = False, timeout: Optional[int] = None
) -> AbstractContextManager[bool]: ...

def shutdown(self) -> None: ...
4 changes: 3 additions & 1 deletion asyncz/schedulers/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import contextlib
from collections.abc import Sequence
from threading import Event, Thread
from typing import TYPE_CHECKING, Any, Optional
Expand Down Expand Up @@ -63,7 +64,8 @@ def _shutdown(self, wait: bool = True) -> None:
if thread:
self.event_loop.stop()
self.event_loop = self.event_loop_thread = None
thread.join()
with contextlib.suppress(RuntimeError):
thread.join()

def shutdown(self, wait: bool = True) -> bool:
# not decremented yet so +1
Expand Down
Loading

0 comments on commit 1cb909f

Please sign in to comment.