diff --git a/custom_components/xiaomi_home/miot/miot_mips.py b/custom_components/xiaomi_home/miot/miot_mips.py index 6c6b358..6c5eccd 100644 --- a/custom_components/xiaomi_home/miot/miot_mips.py +++ b/custom_components/xiaomi_home/miot/miot_mips.py @@ -48,8 +48,6 @@ import asyncio import json import logging -import os -import queue import random import re import ssl @@ -58,19 +56,19 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum, auto -from typing import Any, Callable, Optional, final +from typing import Any, Callable, Optional, final, Coroutine from paho.mqtt.client import ( MQTT_ERR_SUCCESS, MQTT_ERR_UNKNOWN, Client, - MQTTv5) + MQTTv5, + MQTTMessage) # pylint: disable=relative-beyond-top-level from .common import MIoTMatcher from .const import MIHOME_MQTT_KEEPALIVE from .miot_error import MIoTErrorCode, MIoTMipsError -from .miot_ev import MIoTEventLoop, TimeoutHandle _LOGGER = logging.getLogger(__name__) @@ -87,12 +85,12 @@ class MipsMsgTypeOptions(Enum): class MipsMessage: """MIoT Pub/Sub message.""" mid: int = 0 - msg_from: str = None - ret_topic: str = None - payload: str = None + msg_from: str | None = None + ret_topic: str | None = None + payload: str | None = None @staticmethod - def unpack(data: bytes): + def unpack(data: bytes) -> 'MipsMessage': mips_msg = MipsMessage() data_len = len(data) data_start = 0 @@ -122,7 +120,10 @@ def unpack(data: bytes): @staticmethod def pack( - mid: int, payload: str, msg_from: str = None, ret_topic: str = None + mid: int, + payload: str, + msg_from: str | None = None, + ret_topic: str | None = None ) -> bytes: if mid is None or payload is None: raise MIoTMipsError('invalid mid or payload') @@ -152,125 +153,48 @@ def __str__(self) -> str: return f'{self.mid}, {self.msg_from}, {self.ret_topic}, {self.payload}' -class MipsCmdType(Enum): - """MIoT Pub/Sub command type.""" - CONNECT = 0 - DISCONNECT = auto() - DEINIT = auto() - SUB = auto() - UNSUB = auto() - CALL_API = auto() - REG_BROADCAST = auto() - UNREG_BROADCAST = auto() - - REG_MIPS_STATE = auto() - UNREG_MIPS_STATE = auto() - REG_DEVICE_STATE = auto() - UNREG_DEVICE_STATE = auto() - - -@dataclass -class MipsCmd: - """MIoT Pub/Sub command.""" - type_: MipsCmdType - data: Any - - def __init__(self, type_: MipsCmdType, data: Any) -> None: - self.type_ = type_ - self.data = data - - @dataclass class MipsRequest: """MIoT Pub/Sub request.""" - mid: int = None - on_reply: Callable[[str, Any], None] = None - on_reply_ctx: Any = None - timer: TimeoutHandle = None - - -@dataclass -class MipsRequestData: - """MIoT Pub/Sub request data.""" - topic: str = None - payload: str = None - on_reply: Callable[[str, Any], None] = None - on_reply_ctx: Any = None - timeout_ms: int = None - - -@dataclass -class MipsSendBroadcastData: - """MIoT Pub/Sub send broadcast data.""" - topic: str = None - payload: str = None + mid: int + on_reply: Callable[[str, Any], None] + on_reply_ctx: Any + timer: asyncio.TimerHandle | None @dataclass class MipsIncomingApiCall: """MIoT Pub/Sub incoming API call.""" - mid: int = None - ret_topic: str = None - timer: TimeoutHandle = None - - -@dataclass -class MipsApi: - """MIoT Pub/Sub API.""" - topic: str = None - """ - param1: session - param2: payload - param3: handler_ctx - """ - handler: Callable[[MipsIncomingApiCall, str, Any], None] = None - handler_ctx: Any = None - - -class MipsRegApi(MipsApi): - """.MIoT Pub/Sub register API.""" - - -@dataclass -class MipsReplyData: - """MIoT Pub/Sub reply data.""" - session: MipsIncomingApiCall = None - payload: str = None + mid: int | None = None + ret_topic: str | None = None + timer: asyncio.TimerHandle | None = None @dataclass class MipsBroadcast: """MIoT Pub/Sub broadcast.""" - topic: str = None + topic: str """ param 1: msg topic param 2: msg payload param 3: handle_ctx """ - handler: Callable[[str, str, Any], None] = None - handler_ctx: Any = None + handler: Callable[[str, str, Any], None] + handler_ctx: Any def __str__(self) -> str: return f'{self.topic}, {id(self.handler)}, {id(self.handler_ctx)}' -class MipsRegBroadcast(MipsBroadcast): - """MIoT Pub/Sub register broadcast.""" - - @dataclass class MipsState: """MIoT Pub/Sub state.""" - key: str = None + key: str """ str: key bool: mips connect state """ - handler: Callable[[str, bool], asyncio.Future] = None - - -class MipsRegState(MipsState): - """MIoT Pub/Sub register state.""" + handler: Callable[[str, bool], Coroutine] class MIoTDeviceState(Enum): @@ -280,72 +204,54 @@ class MIoTDeviceState(Enum): ONLINE = auto() -@dataclass -class MipsDeviceState: - """MIoT Pub/Sub device state.""" - did: str = None - """handler - str: did - MIoTDeviceState: online/offline/disable - Any: ctx - """ - handler: Callable[[str, MIoTDeviceState, Any], None] = None - handler_ctx: Any = None - - -class MipsRegDeviceState(MipsDeviceState): - """MIoT Pub/Sub register device state.""" - - class MipsClient(ABC): """MIoT Pub/Sub client.""" # pylint: disable=unused-argument - MQTT_INTERVAL_MS = 1000 + MQTT_INTERVAL_S = 1 MIPS_QOS: int = 2 UINT32_MAX: int = 0xFFFFFFFF - MIPS_RECONNECT_INTERVAL_MIN: int = 30000 - MIPS_RECONNECT_INTERVAL_MAX: int = 600000 + MIPS_RECONNECT_INTERVAL_MIN: float = 30 + MIPS_RECONNECT_INTERVAL_MAX: float = 600 MIPS_SUB_PATCH: int = 300 - MIPS_SUB_INTERVAL: int = 1000 + MIPS_SUB_INTERVAL: float = 1 main_loop: asyncio.AbstractEventLoop - _logger: logging.Logger + _logger: logging.Logger | None _client_id: str _host: str _port: int - _username: str - _password: str - _ca_file: str - _cert_file: str - _key_file: str + _username: str | None + _password: str | None + _ca_file: str | None + _cert_file: str | None + _key_file: str | None - _mqtt_logger: logging.Logger + _mqtt_logger: logging.Logger | None _mqtt: Client _mqtt_fd: int - _mqtt_timer: TimeoutHandle + _mqtt_timer: asyncio.TimerHandle | None _mqtt_state: bool _event_connect: asyncio.Event _event_disconnect: asyncio.Event - _mev: MIoTEventLoop + _internal_loop: asyncio.AbstractEventLoop _mips_thread: threading.Thread - _mips_queue: queue.Queue - _cmd_event_fd: os.eventfd _mips_reconnect_tag: bool - _mips_reconnect_interval: int - _mips_reconnect_timer: Optional[TimeoutHandle] + _mips_reconnect_interval: float + _mips_reconnect_timer: Optional[asyncio.TimerHandle] _mips_state_sub_map: dict[str, MipsState] _mips_sub_pending_map: dict[str, int] - _mips_sub_pending_timer: Optional[TimeoutHandle] - - _on_mips_cmd: Callable[[MipsCmd], None] - _on_mips_message: Callable[[str, bytes], None] - _on_mips_connect: Callable[[int, dict], None] - _on_mips_disconnect: Callable[[int, dict], None] + _mips_sub_pending_timer: Optional[asyncio.TimerHandle] def __init__( - self, client_id: str, host: str, port: int, - username: str = None, password: str = None, - ca_file: str = None, cert_file: str = None, key_file: str = None, + self, + client_id: + str, host: str, + port: int, + username: Optional[str] = None, + password: Optional[str] = None, + ca_file: Optional[str] = None, + cert_file: Optional[str] = None, + key_file: Optional[str] = None, loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: # MUST run with running loop @@ -378,20 +284,8 @@ def __init__( self._mips_state_sub_map = {} self._mips_sub_pending_map = {} self._mips_sub_pending_timer = None - self._mev = MIoTEventLoop() - self._mips_queue = queue.Queue() - self._cmd_event_fd = os.eventfd(0, os.O_NONBLOCK) - self.mev_set_read_handler( - self._cmd_event_fd, self.__mips_cmd_read_handler, None) - self._mips_thread = threading.Thread(target=self.__mips_loop_thread) - self._mips_thread.daemon = True - self._mips_thread.name = self._client_id - self._mips_thread.start() + # DO NOT start the thread yet. Do that on connect - self._on_mips_cmd = None - self._on_mips_message = None - self._on_mips_connect = None - self._on_mips_disconnect = None @property def client_id(self) -> str: @@ -416,28 +310,37 @@ def mips_state(self) -> bool: return self._mqtt and self._mqtt.is_connected() @final - def mips_deinit(self) -> None: - self._mips_send_cmd(type_=MipsCmdType.DEINIT, data=None) + def connect(self) -> None: + """mips connect.""" + # TODO: make this more precise + # Mark as not closed, though also not connected yet + self._is_closed = False + # Start mips thread + self._internal_loop = asyncio.new_event_loop() + self._mips_thread = threading.Thread(target=self.__mips_loop_thread) + self._mips_thread.daemon = True + self._mips_thread.name = self._client_id + self._mips_thread.start() + + @final + def close(self) -> None: + self._is_connected = False + + self._internal_loop.call_soon_threadsafe(self.__mips_close) self._mips_thread.join() - self._mips_thread = None + self._internal_loop.close() self._logger = None - self._client_id = None - self._host = None - self._port = None self._username = None self._password = None self._ca_file = None self._cert_file = None self._key_file = None self._mqtt_logger = None - self._mips_state_sub_map = None - self._mips_sub_pending_map = None + self._mips_state_sub_map.clear() + self._mips_sub_pending_map.clear() self._mips_sub_pending_timer = None - self._event_connect = None - self._event_disconnect = None - def update_mqtt_password(self, password: str) -> None: self._password = password self._mqtt.username_pw_set( @@ -466,166 +369,85 @@ def enable_mqtt_logger( else: self._mqtt.disable_logger() - @final - def mips_connect(self) -> None: - """mips connect.""" - return self._mips_send_cmd(type_=MipsCmdType.CONNECT, data=None) - @final async def mips_connect_async(self) -> None: """mips connect async.""" - self._mips_send_cmd(type_=MipsCmdType.CONNECT, data=None) - return await self._event_connect.wait() + self.connect() + await self._event_connect.wait() @final def mips_disconnect(self) -> None: """mips disconnect.""" - return self._mips_send_cmd(type_=MipsCmdType.DISCONNECT, data=None) + self._internal_loop.call_soon_threadsafe(self.__mips_disconnect) @final async def mips_disconnect_async(self) -> None: """mips disconnect async.""" - self._mips_send_cmd(type_=MipsCmdType.DISCONNECT, data=None) - return await self._event_disconnect.wait() + self.mips_disconnect() + await self._event_disconnect.wait() @final def sub_mips_state( - self, key: str, handler: Callable[[str, bool], asyncio.Future] + self, key: str, handler: Callable[[str, bool], Coroutine] ) -> bool: """Subscribe mips state. NOTICE: callback to main loop thread """ if isinstance(key, str) is False or handler is None: raise MIoTMipsError('invalid params') - return self._mips_send_cmd( - type_=MipsCmdType.REG_MIPS_STATE, - data=MipsRegState(key=key, handler=handler)) + self._internal_loop.call_soon_threadsafe( + self.__sub_mips_state, key, handler) + return True @final def unsub_mips_state(self, key: str) -> bool: """Unsubscribe mips state.""" if isinstance(key, str) is False: raise MIoTMipsError('invalid params') - return self._mips_send_cmd( - type_=MipsCmdType.UNREG_MIPS_STATE, data=MipsRegState(key=key)) - - @final - def mev_set_timeout( - self, timeout_ms: int, handler: Callable[[Any], None], - handler_ctx: Any = None - ) -> Optional[TimeoutHandle]: - """set timeout. - NOTICE: Internal function, only mips threads are allowed to call - """ - if self._mev is None: - return None - return self._mev.set_timeout( - timeout_ms=timeout_ms, handler=handler, handler_ctx=handler_ctx) - - @final - def mev_clear_timeout(self, handle: TimeoutHandle) -> None: - """clear timeout. - NOTICE: Internal function, only mips threads are allowed to call - """ - if self._mev is None: - return - self._mev.clear_timeout(handle) - - @final - def mev_set_read_handler( - self, fd: int, handler: Callable[[Any], None], handler_ctx: Any - ) -> bool: - """set read handler. - NOTICE: Internal function, only mips threads are allowed to call - """ - if self._mev is None: - return False - return self._mev.set_read_handler( - fd=fd, handler=handler, handler_ctx=handler_ctx) - - @final - def mev_set_write_handler( - self, fd: int, handler: Callable[[Any], None], handler_ctx: Any - ) -> bool: - """set write handler. - NOTICE: Internal function, only mips threads are allowed to call - """ - if self._mev is None: - return False - return self._mev.set_write_handler( - fd=fd, handler=handler, handler_ctx=handler_ctx) - - @property - def on_mips_cmd(self) -> Callable[[MipsCmd], None]: - return self._on_mips_cmd - - @on_mips_cmd.setter - def on_mips_cmd(self, handler: Callable[[MipsCmd], None]) -> None: - """MUST set after __init__ done. - NOTICE thread safe, this function will be called at the **mips** thread - """ - self._on_mips_cmd = handler - - @property - def on_mips_message(self) -> Callable[[str, bytes], None]: - return self._on_mips_message - - @on_mips_message.setter - def on_mips_message(self, handler: Callable[[str, bytes], None]) -> None: - """MUST set after __init__ done. - NOTICE thread safe, this function will be called at the **mips** thread - """ - self._on_mips_message = handler - - @property - def on_mips_connect(self) -> Callable[[int, dict], None]: - return self._on_mips_connect - - @on_mips_connect.setter - def on_mips_connect(self, handler: Callable[[int, dict], None]) -> None: - """MUST set after __init__ done. - NOTICE thread safe, this function will be called at the - **main loop** thread - """ - self._on_mips_connect = handler - - @property - def on_mips_disconnect(self) -> Callable[[int, dict], None]: - return self._on_mips_disconnect - - @on_mips_disconnect.setter - def on_mips_disconnect(self, handler: Callable[[int, dict], None]) -> None: - """MUST set after __init__ done. - NOTICE thread safe, this function will be called at the - **main loop** thread - """ - self._on_mips_disconnect = handler + self._internal_loop.call_soon_threadsafe(self.__unsub_mips_state, key) + return True @abstractmethod def sub_prop( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, piid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + piid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: ... @abstractmethod def unsub_prop( - self, did: str, siid: int = None, piid: int = None + self, + did: str, + siid: Optional[int] = None, + piid: Optional[int] = None ) -> bool: ... @abstractmethod def sub_event( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, eiid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + eiid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: ... @abstractmethod def unsub_event( - self, did: str, siid: int = None, eiid: int = None + self, + did: str, + siid: Optional[int] = None, + eiid: Optional[int] = None ) -> bool: ... @abstractmethod async def get_dev_list_async( - self, payload: str = None, timeout_ms: int = 10000 + self, + payload: Optional[str] = None, + timeout_ms: int = 10000 ) -> dict[str, dict]: ... @abstractmethod @@ -637,13 +459,22 @@ async def get_prop_async( async def set_prop_async( self, did: str, siid: int, piid: int, value: Any, timeout_ms: int = 10000 - ) -> bool: ... + ) -> dict: ... @abstractmethod async def action_async( self, did: str, siid: int, aiid: int, in_list: list, timeout_ms: int = 10000 - ) -> tuple[bool, list]: ... + ) -> dict: ... + + @abstractmethod + def _on_mips_message(self, topic: str, payload: bytes) -> None:... + + @abstractmethod + def _on_mips_connect(self, rc: int, props: dict) -> None:... + + @abstractmethod + def _on_mips_disconnect(self, rc: int, props: dict) -> None:... @final def _mips_sub_internal(self, topic: str) -> None: @@ -657,8 +488,8 @@ def _mips_sub_internal(self, topic: str) -> None: if topic not in self._mips_sub_pending_map: self._mips_sub_pending_map[topic] = 0 if not self._mips_sub_pending_timer: - self._mips_sub_pending_timer = self.mev_set_timeout( - 10, self.__mips_sub_internal_pending_handler, topic) + self._mips_sub_pending_timer = self._internal_loop.call_later( + 0.01, self.__mips_sub_internal_pending_handler, topic) except Exception as err: # pylint: disable=broad-exception-caught # Catch all exception self.log_error(f'mips sub internal error, {topic}. {err}') @@ -707,75 +538,24 @@ def _mips_publish_internal( self.log_error(f'mips publish internal error, {err}') return False - @final - def _mips_send_cmd(self, type_: MipsCmdType, data: Any) -> bool: - if self._mips_queue is None or self._cmd_event_fd is None: - raise MIoTMipsError('send mips cmd disable') - # Put data to queue - self._mips_queue.put(MipsCmd(type_=type_, data=data)) - # Write event fd - os.eventfd_write(self._cmd_event_fd, 1) - # self.log_debug(f'send mips cmd, {type}, {data}') - return True - def __thread_check(self) -> None: if threading.current_thread() is not self._mips_thread: raise MIoTMipsError('illegal call') - def __mips_cmd_read_handler(self, ctx: Any) -> None: - fd_value = os.eventfd_read(self._cmd_event_fd) - if fd_value == 0: - return - while self._mips_queue.empty() is False: - mips_cmd: MipsCmd = self._mips_queue.get(block=False) - if mips_cmd.type_ == MipsCmdType.CONNECT: - self._mips_reconnect_tag = True - self.__mips_try_reconnect(immediately=True) - elif mips_cmd.type_ == MipsCmdType.DISCONNECT: - self._mips_reconnect_tag = False - self.__mips_disconnect() - elif mips_cmd.type_ == MipsCmdType.DEINIT: - self.log_info('mips client recv deinit cmd') - self.__mips_disconnect() - # Close cmd event fd - if self._cmd_event_fd: - self.mev_set_read_handler( - self._cmd_event_fd, None, None) - os.close(self._cmd_event_fd) - self._cmd_event_fd = None - if self._mips_queue: - self._mips_queue = None - # ev loop stop - if self._mev: - self._mev.loop_stop() - self._mev = None - break - elif mips_cmd.type_ == MipsCmdType.REG_MIPS_STATE: - state: MipsState = mips_cmd.data - self._mips_state_sub_map[state.key] = state - self.log_debug(f'mips register mips state, {state.key}') - elif mips_cmd.type_ == MipsCmdType.UNREG_MIPS_STATE: - state: MipsState = mips_cmd.data - del self._mips_state_sub_map[state.key] - self.log_debug(f'mips unregister mips state, {state.key}') - else: - if self._on_mips_cmd: - self._on_mips_cmd(mips_cmd=mips_cmd) + def __mqtt_read_handler(self) -> None: + self.__mqtt_loop_handler() - def __mqtt_read_handler(self, ctx: Any) -> None: - self.__mqtt_loop_handler(ctx=ctx) + def __mqtt_write_handler(self) -> None: + self._internal_loop.remove_writer(self._mqtt_fd) + self.__mqtt_loop_handler() - def __mqtt_write_handler(self, ctx: Any) -> None: - self.mev_set_write_handler(self._mqtt_fd, None, None) - self.__mqtt_loop_handler(ctx=ctx) - - def __mqtt_timer_handler(self, ctx: Any) -> None: - self.__mqtt_loop_handler(ctx=ctx) + def __mqtt_timer_handler(self) -> None: + self.__mqtt_loop_handler() if self._mqtt: - self._mqtt_timer = self.mev_set_timeout( - self.MQTT_INTERVAL_MS, self.__mqtt_timer_handler, None) + self._mqtt_timer = self._internal_loop.call_later( + self.MQTT_INTERVAL_S, self.__mqtt_timer_handler) - def __mqtt_loop_handler(self, ctx: Any) -> None: + def __mqtt_loop_handler(self) -> None: try: if self._mqtt: self._mqtt.loop_read() @@ -784,8 +564,8 @@ def __mqtt_loop_handler(self, ctx: Any) -> None: if self._mqtt: self._mqtt.loop_misc() if self._mqtt and self._mqtt.want_write(): - self.mev_set_write_handler( - self._mqtt_fd, self.__mqtt_write_handler, None) + self._internal_loop.add_writer( + self._mqtt_fd, self.__mqtt_write_handler) except Exception as err: # pylint: disable=broad-exception-caught # Catch all exception self.log_error(f'__mqtt_loop_handler, {err}') @@ -814,8 +594,10 @@ def __mips_loop_thread(self) -> None: self._mqtt.on_connect_fail = self.__on_connect_failed self._mqtt.on_disconnect = self.__on_disconnect self._mqtt.on_message = self.__on_message + # Connect to mips + self.__mips_start_connect_tries() # Run event loop - self._mev.loop_forever() + self._internal_loop.run_forever() self.log_info('mips_loop_thread exit!') def __on_connect(self, client, user_data, flags, rc, props) -> None: @@ -824,10 +606,8 @@ def __on_connect(self, client, user_data, flags, rc, props) -> None: self.log_info(f'mips connect, {flags}, {rc}, {props}') self._mqtt_state = True if self._on_mips_connect: - self.mev_set_timeout( - timeout_ms=0, - handler=lambda ctx: - self._on_mips_connect(rc, props)) + self._internal_loop.call_soon( + self._on_mips_connect, rc, props) for item in self._mips_state_sub_map.values(): if item.handler is None: continue @@ -838,8 +618,8 @@ def __on_connect(self, client, user_data, flags, rc, props) -> None: self._event_connect.set() self._event_disconnect.clear() - def __on_connect_failed(self, client, user_data, flags, rc) -> None: - self.log_error(f'mips connect failed, {flags}, {rc}') + def __on_connect_failed(self, client:Client, user_data:Any) -> None: + self.log_error(f'mips connect failed') # Try to reconnect self.__mips_try_reconnect() @@ -848,22 +628,20 @@ def __on_disconnect(self, client, user_data, rc, props) -> None: self.log_error(f'mips disconnect, {rc}, {props}') self._mqtt_state = False if self._mqtt_timer: - self.mev_clear_timeout(self._mqtt_timer) + self._mqtt_timer.cancel() self._mqtt_timer = None if self._mqtt_fd != -1: - self.mev_set_read_handler(self._mqtt_fd, None, None) - self.mev_set_write_handler(self._mqtt_fd, None, None) + self._internal_loop.remove_reader(self._mqtt_fd) + self._internal_loop.remove_writer(self._mqtt_fd) self._mqtt_fd = -1 # Clear retry sub if self._mips_sub_pending_timer: - self.mev_clear_timeout(self._mips_sub_pending_timer) + self._mips_sub_pending_timer.cancel() self._mips_sub_pending_timer = None self._mips_sub_pending_map = {} if self._on_mips_disconnect: - self.mev_set_timeout( - timeout_ms=0, - handler=lambda ctx: - self._on_mips_disconnect(rc, props)) + self._internal_loop.call_soon( + self._on_mips_disconnect, rc, props) # Call state sub handler for item in self._mips_state_sub_map.values(): if item.handler is None: @@ -878,23 +656,9 @@ def __on_disconnect(self, client, user_data, rc, props) -> None: self._event_disconnect.set() self._event_connect.clear() - def __on_message(self, client, user_data, msg) -> None: + def __on_message(self, client:Client, user_data:Any , msg:MQTTMessage) -> None: self._on_mips_message(topic=msg.topic, payload=msg.payload) - def __mips_try_reconnect(self, immediately: bool = False) -> None: - if self._mips_reconnect_timer: - self.mev_clear_timeout(self._mips_reconnect_timer) - self._mips_reconnect_timer = None - if not self._mips_reconnect_tag: - return - interval: int = 0 - if not immediately: - interval = self.__get_next_reconnect_time() - self.log_error( - 'mips try reconnect after %sms', interval) - self._mips_reconnect_timer = self.mev_set_timeout( - interval, self.__mips_connect, None) - def __mips_sub_internal_pending_handler(self, ctx: Any) -> None: subbed_count = 1 for topic in list(self._mips_sub_pending_map.keys()): @@ -916,25 +680,25 @@ def __mips_sub_internal_pending_handler(self, ctx: Any) -> None: f'retry mips sub internal, {count}, {topic}, {result}, {mid}') if len(self._mips_sub_pending_map): - self._mips_sub_pending_timer = self.mev_set_timeout( + self._mips_sub_pending_timer = self._internal_loop.call_later( self.MIPS_SUB_INTERVAL, self.__mips_sub_internal_pending_handler, None) else: self._mips_sub_pending_timer = None - def __mips_connect(self, ctx: Any = None) -> None: + def __mips_connect(self) -> None: result = MQTT_ERR_UNKNOWN if self._mips_reconnect_timer: - self.mev_clear_timeout(self._mips_reconnect_timer) + self._mips_reconnect_timer.cancel() self._mips_reconnect_timer = None try: # Try clean mqtt fd before mqtt connect if self._mqtt_timer: - self.mev_clear_timeout(self._mqtt_timer) + self._mqtt_timer.cancel() self._mqtt_timer = None if self._mqtt_fd != -1: - self.mev_set_read_handler(self._mqtt_fd, None, None) - self.mev_set_write_handler(self._mqtt_fd, None, None) + self._internal_loop.remove_reader(self._mqtt_fd) + self._internal_loop.remove_writer(self._mqtt_fd) self._mqtt_fd = -1 result = self._mqtt.connect( host=self._host, port=self._port, @@ -944,33 +708,73 @@ def __mips_connect(self, ctx: Any = None) -> None: self.log_error('__mips_connect, connect error, %s', error) if result == MQTT_ERR_SUCCESS: - self._mqtt_fd = self._mqtt.socket() + socket = self._mqtt.socket() + if socket is None: + self.log_error('__mips_connect, connect success, but socket is None') + self.__mips_try_reconnect() + return + self._mqtt_fd = socket.fileno() self.log_debug(f'__mips_connect, _mqtt_fd, {self._mqtt_fd}') - self.mev_set_read_handler( - self._mqtt_fd, self.__mqtt_read_handler, None) + self._internal_loop.add_reader( + self._mqtt_fd, self.__mqtt_read_handler) if self._mqtt.want_write(): - self.mev_set_write_handler( - self._mqtt_fd, self.__mqtt_write_handler, None) - self._mqtt_timer = self.mev_set_timeout( - self.MQTT_INTERVAL_MS, self.__mqtt_timer_handler, None) + self._internal_loop.add_writer( + self._mqtt_fd, self.__mqtt_write_handler) + self._mqtt_timer = self._internal_loop.call_later( + self.MQTT_INTERVAL_S, self.__mqtt_timer_handler) else: self.log_error(f'__mips_connect error result, {result}') self.__mips_try_reconnect() + + def __mips_try_reconnect(self, immediately: bool = False) -> None: + if self._mips_reconnect_timer: + self._mips_reconnect_timer.cancel() + self._mips_reconnect_timer = None + if not self._mips_reconnect_tag: + return + interval: float = 0 + if not immediately: + interval = self.__get_next_reconnect_time() + self.log_error( + 'mips try reconnect after %ss', interval) + self._mips_reconnect_timer = self._internal_loop.call_later( + interval, self.__mips_connect) + + def __mips_start_connect_tries(self) -> None: + self._mips_reconnect_tag = True + self.__mips_try_reconnect(immediately=True) def __mips_disconnect(self) -> None: + self._mips_reconnect_tag = False if self._mips_reconnect_timer: - self.mev_clear_timeout(self._mips_reconnect_timer) + self._mips_reconnect_timer.cancel() self._mips_reconnect_timer = None if self._mqtt_timer: - self.mev_clear_timeout(self._mqtt_timer) + self._mqtt_timer.cancel() self._mqtt_timer = None if self._mqtt_fd != -1: - self.mev_set_read_handler(self._mqtt_fd, None, None) - self.mev_set_write_handler(self._mqtt_fd, None, None) + self._internal_loop.remove_reader(self._mqtt_fd) + self._internal_loop.remove_writer(self._mqtt_fd) self._mqtt_fd = -1 self._mqtt.disconnect() - def __get_next_reconnect_time(self) -> int: + def __mips_close(self) -> None: + self.log_info('mips client closing') + self.__mips_disconnect() + self._internal_loop.stop() + + def __sub_mips_state( + self, key: str, handler: Callable[[str, bool], Coroutine] + ) -> None: + state = MipsState(key=key, handler=handler) + self._mips_state_sub_map[key] = state + self.log_debug(f'mips register mips state, {key}') + + def __unsub_mips_state(self, key: str) -> None: + del self._mips_state_sub_map[key] + self.log_debug(f'mips unregister mips state, {key}') + + def __get_next_reconnect_time(self) -> float: if self._mips_reconnect_interval == 0: self._mips_reconnect_interval = self.MIPS_RECONNECT_INTERVAL_MIN else: @@ -996,22 +800,6 @@ def __init__( client_id=f'ha.{uuid}', host=f'{cloud_server}-ha.mqtt.io.mi.com', port=port, username=app_id, password=token, loop=loop) - self.on_mips_cmd = self.__on_mips_cmd_handler - self.on_mips_message = self.__on_mips_message_handler - self.on_mips_connect = self.__on_mips_connect_handler - self.on_mips_disconnect = self.__on_mips_disconnect_handler - - def deinit(self) -> None: - self.mips_deinit() - self._msg_matcher = None - self.on_mips_cmd = None - self.on_mips_message = None - self.on_mips_connect = None - - @final - def connect(self) -> None: - self.mips_connect() - @final async def connect_async(self) -> None: await self.mips_connect_async() @@ -1029,12 +817,17 @@ async def disconnect_async(self) -> None: def update_access_token(self, access_token: str) -> bool: if not isinstance(access_token, str): raise MIoTMipsError('invalid token') - return self.update_mqtt_password(password=access_token) + self.update_mqtt_password(password=access_token) + return True @final def sub_prop( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, piid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + piid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: if not isinstance(did, str) or handler is None: raise MIoTMipsError('invalid params') @@ -1043,7 +836,7 @@ def sub_prop( f'device/{did}/up/properties_changed/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') - def on_prop_msg(topic: str, payload: str, ctx: Any) -> bool: + def on_prop_msg(topic: str, payload: str, ctx: Any) -> None: try: msg: dict = json.loads(payload) except json.JSONDecodeError: @@ -1062,22 +855,31 @@ def on_prop_msg(topic: str, payload: str, ctx: Any) -> bool: if handler: self.log_debug('on properties_changed, %s', payload) handler(msg['params'], ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_prop_msg, handler_ctx=handler_ctx) @final - def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool: + def unsub_prop( + self, + did: str, + siid: Optional[int] = None, + piid: Optional[int] = None + ) -> bool: if not isinstance(did, str): raise MIoTMipsError('invalid params') topic: str = ( f'device/{did}/up/properties_changed/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) @final def sub_event( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, eiid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + eiid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: if not isinstance(did, str) or handler is None: raise MIoTMipsError('invalid params') @@ -1086,7 +888,7 @@ def sub_event( f'device/{did}/up/event_occured/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') - def on_event_msg(topic: str, payload: str, ctx: Any) -> bool: + def on_event_msg(topic: str, payload: str, ctx: Any) -> None: try: msg: dict = json.loads(payload) except json.JSONDecodeError: @@ -1106,18 +908,23 @@ def on_event_msg(topic: str, payload: str, ctx: Any) -> bool: self.log_debug('on on_event_msg, %s', payload) msg['params']['from'] = 'cloud' handler(msg['params'], ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_event_msg, handler_ctx=handler_ctx) @final - def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool: + def unsub_event( + self, + did: str, + siid: Optional[int] = None, + eiid: Optional[int] = None + ) -> bool: if not isinstance(did, str): raise MIoTMipsError('invalid params') # Spelling error: event_occured topic: str = ( f'device/{did}/up/event_occured/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) @final def sub_device_state( @@ -1145,7 +952,7 @@ def on_state_msg(topic: str, payload: str, ctx: Any) -> None: handler( did, MIoTDeviceState.ONLINE if msg['event'] == 'online' else MIoTDeviceState.OFFLINE, ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_state_msg, handler_ctx=handler_ctx) @final @@ -1153,10 +960,10 @@ def unsub_device_state(self, did: str) -> bool: if not isinstance(did, str): raise MIoTMipsError('invalid params') topic: str = f'device/{did}/state/#' - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) async def get_dev_list_async( - self, payload: str = None, timeout_ms: int = 10000 + self, payload: Optional[str] = None, timeout_ms: int = 10000 ) -> dict[str, dict]: raise NotImplementedError('please call in http client') @@ -1168,60 +975,57 @@ async def get_prop_async( async def set_prop_async( self, did: str, siid: int, piid: int, value: Any, timeout_ms: int = 10000 - ) -> bool: + ) -> dict: raise NotImplementedError('please call in http client') async def action_async( self, did: str, siid: int, aiid: int, in_list: list, timeout_ms: int = 10000 - ) -> tuple[bool, list]: + ) -> dict: raise NotImplementedError('please call in http client') - def __on_mips_cmd_handler(self, mips_cmd: MipsCmd) -> None: - """ - NOTICE thread safe, this function will be called at the **mips** thread - """ - if mips_cmd.type_ == MipsCmdType.REG_BROADCAST: - reg_bc: MipsRegBroadcast = mips_cmd.data - if not self._msg_matcher.get(topic=reg_bc.topic): - sub_bc: MipsBroadcast = MipsBroadcast( - topic=reg_bc.topic, handler=reg_bc.handler, - handler_ctx=reg_bc.handler_ctx) - self._msg_matcher[reg_bc.topic] = sub_bc - self._mips_sub_internal(topic=reg_bc.topic) - else: - self.log_debug(f'mips cloud re-reg broadcast, {reg_bc.topic}') - elif mips_cmd.type_ == MipsCmdType.UNREG_BROADCAST: - unreg_bc: MipsRegBroadcast = mips_cmd.data - if self._msg_matcher.get(topic=unreg_bc.topic): - del self._msg_matcher[unreg_bc.topic] - self._mips_unsub_internal(topic=unreg_bc.topic) - - def __reg_broadcast( + def __reg_broadcast_external( self, topic: str, handler: Callable[[str, str, Any], None], handler_ctx: Any = None ) -> bool: - return self._mips_send_cmd( - type_=MipsCmdType.REG_BROADCAST, - data=MipsRegBroadcast( - topic=topic, handler=handler, handler_ctx=handler_ctx)) + self._internal_loop.call_soon_threadsafe( + self.__reg_broadcast, topic, handler, handler_ctx) + return True - def __unreg_broadcast(self, topic: str) -> bool: - return self._mips_send_cmd( - type_=MipsCmdType.UNREG_BROADCAST, - data=MipsRegBroadcast(topic=topic)) + def __unreg_broadcast_external(self, topic: str) -> bool: + self._internal_loop.call_soon_threadsafe( + self.__unreg_broadcast, topic) + return True - def __on_mips_connect_handler(self, rc, props) -> None: + def __reg_broadcast( + self, topic: str, handler: Callable[[str, str, Any], None], + handler_ctx: Any = None + ) -> None: + if not self._msg_matcher.get(topic=topic): + sub_bc: MipsBroadcast = MipsBroadcast( + topic=topic, handler=handler, + handler_ctx=handler_ctx) + self._msg_matcher[topic] = sub_bc + self._mips_sub_internal(topic=topic) + else: + self.log_debug(f'mips cloud re-reg broadcast, {topic}') + + def __unreg_broadcast(self, topic: str) -> None: + if self._msg_matcher.get(topic=topic): + del self._msg_matcher[topic] + self._mips_unsub_internal(topic=topic) + + def _on_mips_connect(self, rc: int, props: dict) -> None: """sub topic.""" for topic, _ in list( self._msg_matcher.iter_all_nodes()): self._mips_sub_internal(topic=topic) - def __on_mips_disconnect_handler(self, rc, props) -> None: + def _on_mips_disconnect(self, rc: int, props: dict) -> None: """unsub topic.""" pass - def __on_mips_message_handler(self, topic: str, payload) -> None: + def _on_mips_message(self, topic: str, payload: bytes) -> None: """ NOTICE thread safe, this function will be called at the **mips** thread """ @@ -1230,23 +1034,25 @@ def __on_mips_message_handler(self, topic: str, payload) -> None: self._msg_matcher.iter_match(topic)) if not bc_list: return + # The message from the cloud is not packed. + payload_str:str = payload.decode('utf-8') # self.log_debug(f"on broadcast, {topic}, {payload}") for item in bc_list or []: if item.handler is None: continue # NOTICE: call threadsafe self.main_loop.call_soon_threadsafe( - item.handler, topic, payload, item.handler_ctx) + item.handler, topic, payload_str, item.handler_ctx) class MipsLocalClient(MipsClient): """MIoT Pub/Sub Local Client.""" # pylint: disable=unused-argument # pylint: disable=inconsistent-quotes - MIPS_RECONNECT_INTERVAL_MIN: int = 6000 - MIPS_RECONNECT_INTERVAL_MAX: int = 60000 + MIPS_RECONNECT_INTERVAL_MIN: float = 6 + MIPS_RECONNECT_INTERVAL_MAX: float = 60 MIPS_SUB_PATCH: int = 1000 - MIPS_SUB_INTERVAL: int = 100 + MIPS_SUB_INTERVAL: float = 0.1 _did: str _group_id: str _home_name: str @@ -1255,10 +1061,9 @@ class MipsLocalClient(MipsClient): _dev_list_change_topic: str _request_map: dict[str, MipsRequest] _msg_matcher: MIoTMatcher - _device_state_sub_map: dict[str, MipsDeviceState] _get_prop_queue: dict[str, list] - _get_prop_timer: asyncio.TimerHandle - _on_dev_list_changed: Callable[[Any, list[str]], asyncio.Future] + _get_prop_timer: asyncio.TimerHandle | None + _on_dev_list_changed: Callable[[Any, list[str]], Coroutine] | None def __init__( self, did: str, host: str, group_id: str, @@ -1274,7 +1079,6 @@ def __init__( self._dev_list_change_topic = f'{did}/appMsg/devListChange' self._request_map = {} self._msg_matcher = MIoTMatcher() - self._device_state_sub_map = {} self._get_prop_queue = {} self._get_prop_timer = None self._on_dev_list_changed = None @@ -1285,31 +1089,10 @@ def __init__( # MIPS local thread name use group_id self._mips_thread.name = self._group_id - self.on_mips_cmd = self.__on_mips_cmd_handler - self.on_mips_message = self.__on_mips_message_handler - self.on_mips_connect = self.__on_mips_connect_handler - @property def group_id(self) -> str: return self._group_id - def deinit(self) -> None: - self.mips_deinit() - self._did = None - self._mips_seed_id = None - self._reply_topic = None - self._dev_list_change_topic = None - self._request_map = None - self._msg_matcher = None - self._device_state_sub_map = None - self._get_prop_queue = None - self._get_prop_timer = None - self._on_dev_list_changed = None - - self.on_mips_cmd = None - self.on_mips_message = None - self.on_mips_connect = None - def log_debug(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.debug(f'{self._home_name}, '+msg, *args, **kwargs) @@ -1322,10 +1105,6 @@ def log_error(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.error(f'{self._home_name}, '+msg, *args, **kwargs) - @final - def connect(self) -> None: - self.mips_connect() - @final async def connect_async(self) -> None: await self.mips_connect_async() @@ -1335,19 +1114,21 @@ def disconnect(self) -> None: self.mips_disconnect() self._request_map = {} self._msg_matcher = MIoTMatcher() - self._device_state_sub_map = {} @final async def disconnect_async(self) -> None: await self.mips_disconnect_async() self._request_map = {} self._msg_matcher = MIoTMatcher() - self._device_state_sub_map = {} @final def sub_prop( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, piid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + piid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/property/' @@ -1367,20 +1148,29 @@ def on_prop_msg(topic: str, payload: str, ctx: Any): if handler: self.log_debug('local, on properties_changed, %s', payload) handler(msg, ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_prop_msg, handler_ctx=handler_ctx) @final - def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool: + def unsub_prop( + self, + did: str, + siid: Optional[int] = None, + piid: Optional[int] = None + ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/property/' f'{"#" if siid is None or piid is None else f"{siid}.{piid}"}') - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) @final def sub_event( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, eiid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + eiid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/event/' @@ -1400,15 +1190,20 @@ def on_event_msg(topic: str, payload: str, ctx: Any): if handler: self.log_debug('local, on event_occurred, %s', payload) handler(msg, ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_event_msg, handler_ctx=handler_ctx) @final - def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool: + def unsub_event( + self, + did: str, + siid: Optional[int] = None, + eiid: Optional[int] = None + ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/event/' f'{"#" if siid is None or eiid is None else f"{siid}.{eiid}"}') - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) @final async def get_prop_safe_async( @@ -1426,7 +1221,9 @@ async def get_prop_safe_async( 'timeout_ms': timeout_ms }) if self._get_prop_timer is None: - self._get_prop_timer = self.main_loop.create_task( + self._get_prop_timer = self.main_loop.call_later( + 0.1, + self.main_loop.create_task, self.__get_prop_timer_handle()) return await fut @@ -1515,13 +1312,13 @@ async def action_async( @final async def get_dev_list_async( - self, payload: str = None, timeout_ms: int = 10000 + self, payload: Optional[str] = None, timeout_ms: int = 10000 ) -> dict[str, dict]: result_obj = await self.__request_async( topic='proxy/getDevList', payload=payload or '{}', timeout_ms=timeout_ms) if not result_obj or 'devList' not in result_obj: - return None + raise MIoTMipsError('invalid result') device_list = {} for did, info in result_obj['devList'].items(): name: str = info.get('name', None) @@ -1557,7 +1354,7 @@ async def get_action_group_list_async( payload='{}', timeout_ms=timeout_ms) if not result_obj or 'result' not in result_obj: - return None + raise MIoTMipsError('invalid result') return result_obj['result'] @final @@ -1579,79 +1376,71 @@ async def exec_action_group_list_async( @final @property - def on_dev_list_changed(self) -> Callable[[Any, list[str]], asyncio.Future]: + def on_dev_list_changed(self) -> Callable[[Any, list[str]], Coroutine] | None: return self._on_dev_list_changed @final @on_dev_list_changed.setter def on_dev_list_changed( - self, func: Callable[[Any, list[str]], asyncio.Future] + self, func: Callable[[Any, list[str]], Coroutine] ) -> None: """run in main loop.""" self._on_dev_list_changed = func - @final - def __on_mips_cmd_handler(self, mips_cmd: MipsCmd) -> None: - if mips_cmd.type_ == MipsCmdType.CALL_API: - req_data: MipsRequestData = mips_cmd.data - req = MipsRequest() - req.mid = self.__gen_mips_id - req.on_reply = req_data.on_reply - req.on_reply_ctx = req_data.on_reply_ctx - pub_topic: str = f'master/{req_data.topic}' - result = self.__mips_publish( - topic=pub_topic, payload=req_data.payload, mid=req.mid, - ret_topic=self._reply_topic) - self.log_debug( - f'mips local call api, {result}, {req.mid}, {pub_topic}, ' - f'{req_data.payload}') - - def on_request_timeout(req: MipsRequest): - self.log_error( - f'on mips request timeout, {req.mid}, {pub_topic}' - f', {req_data.payload}') - self._request_map.pop(str(req.mid), None) - req.on_reply( - '{"error":{"code":-10006, "message":"timeout"}}', - req.on_reply_ctx) - req.timer = self.mev_set_timeout( - req_data.timeout_ms, on_request_timeout, req) - self._request_map[str(req.mid)] = req - elif mips_cmd.type_ == MipsCmdType.REG_BROADCAST: - reg_bc: MipsRegBroadcast = mips_cmd.data - sub_topic: str = f'{self._did}/{reg_bc.topic}' - if not self._msg_matcher.get(sub_topic): - sub_bc: MipsBroadcast = MipsBroadcast( - topic=sub_topic, handler=reg_bc.handler, - handler_ctx=reg_bc.handler_ctx) - self._msg_matcher[sub_topic] = sub_bc - self._mips_sub_internal(topic=f'master/{reg_bc.topic}') - else: - self.log_debug(f'mips re-reg broadcast, {sub_topic}') - elif mips_cmd.type_ == MipsCmdType.UNREG_BROADCAST: - unreg_bc: MipsRegBroadcast = mips_cmd.data - # Central hub gateway needs to add prefix - unsub_topic: str = f'{self._did}/{unreg_bc.topic}' - if self._msg_matcher.get(unsub_topic): - del self._msg_matcher[unsub_topic] - self._mips_unsub_internal( - topic=re.sub(f'^{self._did}', 'master', unsub_topic)) - elif mips_cmd.type_ == MipsCmdType.REG_DEVICE_STATE: - reg_dev_state: MipsRegDeviceState = mips_cmd.data - self._device_state_sub_map[reg_dev_state.did] = reg_dev_state - self.log_debug( - f'mips local reg device state, {reg_dev_state.did}') - elif mips_cmd.type_ == MipsCmdType.UNREG_DEVICE_STATE: - unreg_dev_state: MipsRegDeviceState = mips_cmd.data - del self._device_state_sub_map[unreg_dev_state.did] - self.log_debug( - f'mips local unreg device state, {unreg_dev_state.did}') - else: + def __request( + self, topic: str, payload: str, + on_reply: Callable[[str, Any], None], + on_reply_ctx: Any = None, timeout_ms: int = 10000 + ) -> None: + req = MipsRequest( + mid=self.__gen_mips_id, + on_reply=on_reply, + on_reply_ctx=on_reply_ctx, + timer=None) + pub_topic: str = f'master/{topic}' + result = self.__mips_publish( + topic=pub_topic, payload=payload, mid=req.mid, + ret_topic=self._reply_topic) + self.log_debug( + f'mips local call api, {result}, {req.mid}, {pub_topic}, ' + f'{payload}') + + def on_request_timeout(req: MipsRequest): self.log_error( - f'mips local recv unknown cmd, {mips_cmd.type_}, ' - f'{mips_cmd.data}') + f'on mips request timeout, {req.mid}, {pub_topic}' + f', {payload}') + self._request_map.pop(str(req.mid), None) + req.on_reply( + '{"error":{"code":-10006, "message":"timeout"}}', + req.on_reply_ctx) + req.timer = self._internal_loop.call_later( + timeout_ms/1000, on_request_timeout, req) + self._request_map[str(req.mid)] = req - def __on_mips_connect_handler(self, rc, props) -> None: + def __reg_broadcast( + self, topic: str, handler: Callable[[str, str, Any], None], + handler_ctx: Any + ) -> None: + sub_topic: str = f'{self._did}/{topic}' + if not self._msg_matcher.get(sub_topic): + sub_bc: MipsBroadcast = MipsBroadcast( + topic=sub_topic, handler=handler, + handler_ctx=handler_ctx) + self._msg_matcher[sub_topic] = sub_bc + self._mips_sub_internal(topic=f'master/{topic}') + else: + self.log_debug(f'mips re-reg broadcast, {sub_topic}') + + def __unreg_broadcast(self, topic) -> None: + # Central hub gateway needs to add prefix + unsub_topic: str = f'{self._did}/{topic}' + if self._msg_matcher.get(unsub_topic): + del self._msg_matcher[unsub_topic] + self._mips_unsub_internal( + topic=re.sub(f'^{self._did}', 'master', unsub_topic)) + + @final + def _on_mips_connect(self, rc: int, props: dict) -> None: self.log_debug('__on_mips_connect_handler') # Sub did/#, include reply topic self._mips_sub_internal(f'{self._did}/#') @@ -1665,17 +1454,18 @@ def __on_mips_connect_handler(self, rc, props) -> None: topic=re.sub(f'^{self._did}', 'master', topic)) @final - def __on_mips_message_handler(self, topic: str, payload: bytes) -> None: + def _on_mips_message(self, topic: str, payload: bytes) -> None: mips_msg: MipsMessage = MipsMessage.unpack(payload) # self.log_debug( # f"mips local client, on_message, {topic} -> {mips_msg}") # Reply if topic == self._reply_topic: self.log_debug(f'on request reply, {mips_msg}') - req: MipsRequest = self._request_map.pop(str(mips_msg.mid), None) + req: MipsRequest | None = self._request_map.pop(str(mips_msg.mid), None) if req: # Cancel timer - self.mev_clear_timeout(req.timer) + if req.timer: + req.timer.cancel() if req.on_reply: self.main_loop.call_soon_threadsafe( req.on_reply, mips_msg.payload or '{}', @@ -1695,6 +1485,9 @@ def __on_mips_message_handler(self, topic: str, payload: bytes) -> None: return # Device list change if topic == self._dev_list_change_topic: + if mips_msg.payload is None: + self.log_error('devListChange msg is None') + return payload_obj: dict = json.loads(mips_msg.payload) dev_list = payload_obj.get('devList', None) if not isinstance(dev_list, list) or not dev_list: @@ -1704,7 +1497,7 @@ def __on_mips_message_handler(self, topic: str, payload: bytes) -> None: if self._on_dev_list_changed: self.main_loop.call_soon_threadsafe( self.main_loop.create_task, - self._on_dev_list_changed(self, payload_obj['devList'])) + self._on_dev_list_changed(self, dev_list)) return self.log_debug( @@ -1717,9 +1510,13 @@ def __gen_mips_id(self) -> int: return mips_id def __mips_publish( - self, topic: str, payload: str | bytes, mid: int = None, - ret_topic: str = None, wait_for_publish: bool = False, - timeout_ms: int = 10000 + self, + topic: str, + payload: str, + mid: Optional[int] = None, + ret_topic: Optional[str] = None, + wait_for_publish: bool = False, + timeout_ms: int = 10000 ) -> bool: mips_msg: bytes = MipsMessage.pack( mid=mid or self.__gen_mips_id, payload=payload, @@ -1728,34 +1525,30 @@ def __mips_publish( topic=topic.strip(), payload=mips_msg, wait_for_publish=wait_for_publish, timeout_ms=timeout_ms) - def __request( + def __request_external( self, topic: str, payload: str, on_reply: Callable[[str, Any], None], on_reply_ctx: Any = None, timeout_ms: int = 10000 ) -> bool: if topic is None or payload is None or on_reply is None: raise MIoTMipsError('invalid params') - req_data: MipsRequestData = MipsRequestData() - req_data.topic = topic - req_data.payload = payload - req_data.on_reply = on_reply - req_data.on_reply_ctx = on_reply_ctx - req_data.timeout_ms = timeout_ms - return self._mips_send_cmd(type_=MipsCmdType.CALL_API, data=req_data) + self._internal_loop.call_soon_threadsafe( + self.__request, topic, payload, on_reply, on_reply_ctx, timeout_ms) + return True - def __reg_broadcast( + def __reg_broadcast_external( self, topic: str, handler: Callable[[str, str, Any], None], handler_ctx: Any ) -> bool: - return self._mips_send_cmd( - type_=MipsCmdType.REG_BROADCAST, - data=MipsRegBroadcast( - topic=topic, handler=handler, handler_ctx=handler_ctx)) + self._internal_loop.call_soon_threadsafe( + self.__reg_broadcast, + topic, handler, handler_ctx) + return True - def __unreg_broadcast(self, topic) -> bool: - return self._mips_send_cmd( - type_=MipsCmdType.UNREG_BROADCAST, - data=MipsRegBroadcast(topic=topic)) + def __unreg_broadcast_external(self, topic) -> bool: + self._internal_loop.call_soon_threadsafe( + self.__unreg_broadcast, topic) + return True @final async def __request_async( @@ -1767,7 +1560,7 @@ def on_msg_reply(payload: str, ctx: Any): fut: asyncio.Future = ctx if fut: self.main_loop.call_soon_threadsafe(fut.set_result, payload) - if not self.__request( + if not self.__request_external( topic=topic, payload=payload, on_reply=on_msg_reply,