-
Notifications
You must be signed in to change notification settings - Fork 698
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
569e151
commit f288a95
Showing
2 changed files
with
379 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,324 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Copyright (C) 2024 Xiaomi Corporation. | ||
The ownership and intellectual property rights of Xiaomi Home Assistant | ||
Integration and related Xiaomi cloud service API interface provided under this | ||
license, including source code and object code (collectively, "Licensed Work"), | ||
are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi | ||
hereby grants you a personal, limited, non-exclusive, non-transferable, | ||
non-sublicensable, and royalty-free license to reproduce, use, modify, and | ||
distribute the Licensed Work only for your use of Home Assistant for | ||
non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize | ||
you to use the Licensed Work for any other purpose, including but not limited | ||
to use Licensed Work to develop applications (APP), Web services, and other | ||
forms of software. | ||
You may reproduce and distribute copies of the Licensed Work, with or without | ||
modifications, whether in source or object form, provided that you must give | ||
any other recipients of the Licensed Work a copy of this License and retain all | ||
copyright and disclaimers. | ||
Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR | ||
CONDITIONS OF ANY KIND, either express or implied, including, without | ||
limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR | ||
OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or | ||
FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible | ||
for any direct, indirect, special, incidental, or consequential damages or | ||
losses arising from the use or inability to use the Licensed Work. | ||
Xiaomi reserves all rights not expressly granted to you in this License. | ||
Except for the rights expressly granted by Xiaomi under this License, Xiaomi | ||
does not authorize you in any form to use the trademarks, copyrights, or other | ||
forms of intellectual property rights of Xiaomi and its affiliates, including, | ||
without limitation, without obtaining other written permission from Xiaomi, you | ||
shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that | ||
may make the public associate with Xiaomi in any form to publicize or promote | ||
the software or hardware devices that use the Licensed Work. | ||
Xiaomi has the right to immediately terminate all your authorization under this | ||
License in the event: | ||
1. You assert patent invalidation, litigation, or other claims against patents | ||
or other intellectual property rights of Xiaomi or its affiliates; or, | ||
2. You make, have made, manufacture, sell, or offer to sell products that knock | ||
off Xiaomi or its affiliates' products. | ||
MIoT event loop. | ||
""" | ||
import selectors | ||
import heapq | ||
import time | ||
import traceback | ||
from typing import Callable, TypeVar | ||
import logging | ||
import threading | ||
|
||
# pylint: disable=relative-beyond-top-level | ||
from .miot_error import MIoTEvError | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
TimeoutHandle = TypeVar('TimeoutHandle') | ||
|
||
|
||
class MIoTFdHandler: | ||
"""File descriptor handler.""" | ||
fd: int | ||
read_handler: Callable[[any], None] | ||
read_handler_ctx: any | ||
write_handler: Callable[[any], None] | ||
write_handler_ctx: any | ||
|
||
def __init__( | ||
self, fd: int, | ||
read_handler: Callable[[any], None] = None, | ||
read_handler_ctx: any = None, | ||
write_handler: Callable[[any], None] = None, | ||
write_handler_ctx: any = None | ||
) -> None: | ||
self.fd = fd | ||
self.read_handler = read_handler | ||
self.read_handler_ctx = read_handler_ctx | ||
self.write_handler = write_handler | ||
self.write_handler_ctx = write_handler_ctx | ||
|
||
|
||
class MIoTTimeout: | ||
"""Timeout handler.""" | ||
key: TimeoutHandle | ||
target: int | ||
handler: Callable[[any], None] | ||
handler_ctx: any | ||
|
||
def __init__( | ||
self, key: str = None, target: int = None, | ||
handler: Callable[[any], None] = None, | ||
handler_ctx: any = None | ||
) -> None: | ||
self.key = key | ||
self.target = target | ||
self.handler = handler | ||
self.handler_ctx = handler_ctx | ||
|
||
def __lt__(self, other): | ||
return self.target < other.target | ||
|
||
|
||
class MIoTEventLoop: | ||
"""MIoT event loop.""" | ||
_poll_fd: selectors.DefaultSelector | ||
|
||
_fd_handlers: dict[str, MIoTFdHandler] | ||
|
||
_timer_heap: list[MIoTTimeout] | ||
_timer_handlers: dict[str, MIoTTimeout] | ||
_timer_handle_seed: int | ||
|
||
# Label if the current fd handler is freed inside a read handler to | ||
# avoid invalid reading. | ||
_fd_handler_freed_in_read_handler: bool | ||
|
||
def __init__(self) -> None: | ||
self._poll_fd = selectors.DefaultSelector() | ||
self._timer_heap = [] | ||
self._timer_handlers = {} | ||
self._timer_handle_seed = 1 | ||
self._fd_handlers = {} | ||
self._fd_handler_freed_in_read_handler = False | ||
|
||
def loop_forever(self) -> None: | ||
"""Run an event loop in current thread.""" | ||
next_timeout: int | ||
while True: | ||
next_timeout = 0 | ||
# Handle timer | ||
now_ms: int = self.__get_monotonic_ms | ||
while len(self._timer_heap) > 0: | ||
timer: MIoTTimeout = self._timer_heap[0] | ||
if timer is None: | ||
break | ||
if timer.target <= now_ms: | ||
heapq.heappop(self._timer_heap) | ||
del self._timer_handlers[timer.key] | ||
if timer.handler: | ||
timer.handler(timer.handler_ctx) | ||
else: | ||
next_timeout = timer.target-now_ms | ||
break | ||
# Are there any files to listen to | ||
if next_timeout == 0 and self._fd_handlers: | ||
next_timeout = None # None == infinite | ||
# Wait for timers & fds | ||
if next_timeout == 0: | ||
# Neither timer nor fds exist, exit loop | ||
break | ||
# Handle fd event | ||
events = self._poll_fd.select( | ||
timeout=next_timeout/1000.0 if next_timeout else next_timeout) | ||
for key, mask in events: | ||
fd_handler: MIoTFdHandler = key.data | ||
if fd_handler is None: | ||
continue | ||
self._fd_handler_freed_in_read_handler = False | ||
fd_key = str(id(fd_handler.fd)) | ||
if fd_key not in self._fd_handlers: | ||
continue | ||
if ( | ||
mask & selectors.EVENT_READ > 0 | ||
and fd_handler.read_handler | ||
): | ||
fd_handler.read_handler(fd_handler.read_handler_ctx) | ||
if ( | ||
mask & selectors.EVENT_WRITE > 0 | ||
and self._fd_handler_freed_in_read_handler is False | ||
and fd_handler.write_handler | ||
): | ||
fd_handler.write_handler(fd_handler.write_handler_ctx) | ||
|
||
def loop_stop(self) -> None: | ||
"""Stop the event loop.""" | ||
if self._poll_fd: | ||
self._poll_fd.close() | ||
self._poll_fd = None | ||
self._fd_handlers = {} | ||
self._timer_heap = [] | ||
self._timer_handlers = {} | ||
|
||
def set_timeout( | ||
self, timeout_ms: int, handler: Callable[[any], None], | ||
handler_ctx: any = None | ||
) -> TimeoutHandle: | ||
"""Set a timer.""" | ||
if timeout_ms is None or handler is None: | ||
raise MIoTEvError('invalid params') | ||
new_timeout: MIoTTimeout = MIoTTimeout() | ||
new_timeout.key = self.__get_next_timeout_handle | ||
new_timeout.target = self.__get_monotonic_ms + timeout_ms | ||
new_timeout.handler = handler | ||
new_timeout.handler_ctx = handler_ctx | ||
heapq.heappush(self._timer_heap, new_timeout) | ||
self._timer_handlers[new_timeout.key] = new_timeout | ||
return new_timeout.key | ||
|
||
def clear_timeout(self, timer_key: TimeoutHandle) -> None: | ||
"""Stop and remove the timer.""" | ||
if timer_key is None: | ||
return | ||
timer: MIoTTimeout = self._timer_handlers.pop(timer_key, None) | ||
if timer: | ||
self._timer_heap = list(self._timer_heap) | ||
self._timer_heap.remove(timer) | ||
heapq.heapify(self._timer_heap) | ||
|
||
def set_read_handler( | ||
self, fd: int, handler: Callable[[any], None], handler_ctx: any = None | ||
) -> bool: | ||
"""Set a read handler for a file descriptor. | ||
Returns: | ||
bool: True, success. False, failed. | ||
""" | ||
self.__set_handler( | ||
fd, is_read=True, handler=handler, handler_ctx=handler_ctx) | ||
|
||
def set_write_handler( | ||
self, fd: int, handler: Callable[[any], None], handler_ctx: any = None | ||
) -> bool: | ||
"""Set a write handler for a file descriptor. | ||
Returns: | ||
bool: True, success. False, failed. | ||
""" | ||
self.__set_handler( | ||
fd, is_read=False, handler=handler, handler_ctx=handler_ctx) | ||
|
||
def __set_handler( | ||
self, fd, is_read: bool, handler: Callable[[any], None], | ||
handler_ctx: any = None | ||
) -> bool: | ||
"""Set a handler.""" | ||
if fd is None: | ||
raise MIoTEvError('invalid params') | ||
|
||
if not self._poll_fd: | ||
raise MIoTEvError('event loop not started') | ||
|
||
fd_key: str = str(id(fd)) | ||
fd_handler = self._fd_handlers.get(fd_key, None) | ||
|
||
if fd_handler is None: | ||
fd_handler = MIoTFdHandler(fd=fd) | ||
fd_handler.fd = fd | ||
self._fd_handlers[fd_key] = fd_handler | ||
|
||
read_handler_existed = fd_handler.read_handler is not None | ||
write_handler_existed = fd_handler.write_handler is not None | ||
if is_read is True: | ||
fd_handler.read_handler = handler | ||
fd_handler.read_handler_ctx = handler_ctx | ||
else: | ||
fd_handler.write_handler = handler | ||
fd_handler.write_handler_ctx = handler_ctx | ||
|
||
if fd_handler.read_handler is None and fd_handler.write_handler is None: | ||
# Remove from epoll and map | ||
try: | ||
self._poll_fd.unregister(fd) | ||
except (KeyError, ValueError, OSError) as e: | ||
del e | ||
self._fd_handlers.pop(fd_key, None) | ||
# May be inside a read handler, if not, this has no effect | ||
self._fd_handler_freed_in_read_handler = True | ||
elif read_handler_existed is False and write_handler_existed is False: | ||
# Add to epoll | ||
events = 0x0 | ||
if fd_handler.read_handler: | ||
events |= selectors.EVENT_READ | ||
if fd_handler.write_handler: | ||
events |= selectors.EVENT_WRITE | ||
try: | ||
self._poll_fd.register(fd, events=events, data=fd_handler) | ||
except (KeyError, ValueError, OSError) as e: | ||
_LOGGER.error( | ||
'%s, register fd, error, %s, %s, %s, %s, %s', | ||
threading.current_thread().name, | ||
'read' if is_read else 'write', | ||
fd_key, handler, e, traceback.format_exc()) | ||
self._fd_handlers.pop(fd_key, None) | ||
return False | ||
elif ( | ||
read_handler_existed != (fd_handler.read_handler is not None) | ||
or write_handler_existed != (fd_handler.write_handler is not None) | ||
): | ||
# Modify epoll | ||
events = 0x0 | ||
if fd_handler.read_handler: | ||
events |= selectors.EVENT_READ | ||
if fd_handler.write_handler: | ||
events |= selectors.EVENT_WRITE | ||
try: | ||
self._poll_fd.modify(fd, events=events, data=fd_handler) | ||
except (KeyError, ValueError, OSError) as e: | ||
_LOGGER.error( | ||
'%s, modify fd, error, %s, %s, %s, %s, %s', | ||
threading.current_thread().name, | ||
'read' if is_read else 'write', | ||
fd_key, handler, e, traceback.format_exc()) | ||
self._fd_handlers.pop(fd_key, None) | ||
return False | ||
|
||
return True | ||
|
||
@property | ||
def __get_next_timeout_handle(self) -> str: | ||
# Get next timeout handle, that is not larger than the maximum | ||
# value of UINT64 type. | ||
self._timer_handle_seed += 1 | ||
# uint64 max | ||
self._timer_handle_seed %= 0xFFFFFFFFFFFFFFFF | ||
return str(self._timer_handle_seed) | ||
|
||
@property | ||
def __get_monotonic_ms(self) -> int: | ||
"""Get monotonic ms timestamp.""" | ||
return int(time.monotonic()*1000) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
# -*- coding: utf-8 -*- | ||
"""Unit test for miot_ev.py.""" | ||
import os | ||
import pytest | ||
|
||
# pylint: disable=import-outside-toplevel, disable=unused-argument | ||
|
||
|
||
@pytest.mark.github | ||
def test_mev_timer_and_fd(): | ||
from miot.miot_ev import MIoTEventLoop, TimeoutHandle | ||
|
||
mev = MIoTEventLoop() | ||
assert mev | ||
event_fd: os.eventfd = os.eventfd(0, os.O_NONBLOCK) | ||
assert event_fd | ||
timer4: TimeoutHandle = None | ||
|
||
def event_handler(event_fd): | ||
value: int = os.eventfd_read(event_fd) | ||
if value == 1: | ||
mev.clear_timeout(timer4) | ||
print('cancel timer4') | ||
elif value == 2: | ||
print('event write twice in a row') | ||
elif value == 3: | ||
mev.set_read_handler(event_fd, None, None) | ||
os.close(event_fd) | ||
event_fd = None | ||
print('close event fd') | ||
|
||
def timer1_handler(event_fd): | ||
os.eventfd_write(event_fd, 1) | ||
|
||
def timer2_handler(event_fd): | ||
os.eventfd_write(event_fd, 1) | ||
os.eventfd_write(event_fd, 1) | ||
|
||
def timer3_handler(event_fd): | ||
os.eventfd_write(event_fd, 3) | ||
|
||
def timer4_handler(event_fd): | ||
raise ValueError('unreachable code') | ||
|
||
mev.set_read_handler( | ||
event_fd, event_handler, event_fd) | ||
|
||
mev.set_timeout(500, timer1_handler, event_fd) | ||
mev.set_timeout(1000, timer2_handler, event_fd) | ||
mev.set_timeout(1500, timer3_handler, event_fd) | ||
timer4 = mev.set_timeout(2000, timer4_handler, event_fd) | ||
|
||
mev.loop_forever() | ||
# Loop will exit when there are no timers or fd handlers. | ||
mev.loop_stop() |