From 32f1fe3ce14031b0d25d86bc0fdec64b0db69898 Mon Sep 17 00:00:00 2001 From: GitHub Actions Bot Date: Fri, 1 Nov 2024 01:29:34 +0000 Subject: [PATCH] Automated version bump and other updates --- docs/source/wrapyfi.listeners.rst | 8 ++ docs/source/wrapyfi.middlewares.rst | 8 ++ docs/source/wrapyfi.publishers.rst | 8 ++ pyproject.toml | 2 +- setup.py | 13 +++- wrapyfi/listeners/zenoh.py | 110 ++++++++++++++++++++++------ wrapyfi/middlewares/zenoh.py | 6 +- wrapyfi/publishers/zenoh.py | 76 +++++++++++++------ 8 files changed, 183 insertions(+), 48 deletions(-) diff --git a/docs/source/wrapyfi.listeners.rst b/docs/source/wrapyfi.listeners.rst index fad5f6c..3c28a2a 100644 --- a/docs/source/wrapyfi.listeners.rst +++ b/docs/source/wrapyfi.listeners.rst @@ -44,6 +44,14 @@ wrapyfi.listeners.yarp module :undoc-members: :show-inheritance: +wrapyfi.listeners.zenoh module +------------------------------ + +.. automodule:: wrapyfi.listeners.zenoh + :members: + :undoc-members: + :show-inheritance: + wrapyfi.listeners.zeromq module ------------------------------- diff --git a/docs/source/wrapyfi.middlewares.rst b/docs/source/wrapyfi.middlewares.rst index d5b3e32..7cff784 100644 --- a/docs/source/wrapyfi.middlewares.rst +++ b/docs/source/wrapyfi.middlewares.rst @@ -44,6 +44,14 @@ wrapyfi.middlewares.yarp module :undoc-members: :show-inheritance: +wrapyfi.middlewares.zenoh module +-------------------------------- + +.. automodule:: wrapyfi.middlewares.zenoh + :members: + :undoc-members: + :show-inheritance: + wrapyfi.middlewares.zeromq module --------------------------------- diff --git a/docs/source/wrapyfi.publishers.rst b/docs/source/wrapyfi.publishers.rst index e181c1f..acba06d 100644 --- a/docs/source/wrapyfi.publishers.rst +++ b/docs/source/wrapyfi.publishers.rst @@ -44,6 +44,14 @@ wrapyfi.publishers.yarp module :undoc-members: :show-inheritance: +wrapyfi.publishers.zenoh module +------------------------------- + +.. automodule:: wrapyfi.publishers.zenoh + :members: + :undoc-members: + :show-inheritance: + wrapyfi.publishers.zeromq module -------------------------------- diff --git a/pyproject.toml b/pyproject.toml index 2b43c22..f0b5c7e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "wrapyfi" -version = "0.4.50" +version = "0.4.51" description = "Wrapyfi is a wrapper for simplifying Middleware communication" readme = "README.md" authors = [ diff --git a/setup.py b/setup.py index 30bf784..1228fc8 100755 --- a/setup.py +++ b/setup.py @@ -72,7 +72,18 @@ def check_cv2(default_python="opencv-python"): + check_cv2("opencv-python-headless"), "headless_mqtt": ["wrapyfi[mqtt]", "wrapyfi[numpy]"] + check_cv2("opencv-python-headless"), - "complete": ["wrapyfi[numpy]", "sounddevice", "soundfile", "Pillow", "pandas", "wrapyfi[pyzmq]", "wrapyfi[websocket]", "wrapyfi[zenoh]", "wrapyfi[mqtt]", "wrapyfi[docs]"] + "complete": [ + "wrapyfi[numpy]", + "sounddevice", + "soundfile", + "Pillow", + "pandas", + "wrapyfi[pyzmq]", + "wrapyfi[websocket]", + "wrapyfi[zenoh]", + "wrapyfi[mqtt]", + "wrapyfi[docs]", + ] + check_cv2("opencv-contrib-python"), "all": ["wrapyfi[numpy]", "wrapyfi[pyzmq]"] + check_cv2("opencv-contrib-python"), diff --git a/wrapyfi/listeners/zenoh.py b/wrapyfi/listeners/zenoh.py index ec798e9..fbe6e9f 100644 --- a/wrapyfi/listeners/zenoh.py +++ b/wrapyfi/listeners/zenoh.py @@ -21,7 +21,9 @@ ZENOH_CONNECT = json.loads(os.getenv("WRAPYFI_ZENOH_CONNECT", "[]")) ZENOH_LISTEN = json.loads(os.getenv("WRAPYFI_ZENOH_LISTEN", "[]")) ZENOH_CONFIG_FILEPATH = os.getenv("WRAPYFI_ZENOH_CONFIG_FILEPATH", None) -ZENOH_MONITOR_LISTENER_SPAWN = os.getenv("WRAPYFI_ZENOH_MONITOR_LISTENER_SPAWN", "thread") +ZENOH_MONITOR_LISTENER_SPAWN = os.getenv( + "WRAPYFI_ZENOH_MONITOR_LISTENER_SPAWN", "thread" +) WATCHDOG_POLL_INTERVAL = float(os.getenv("WRAPYFI_ZENOH_RETRY_INTERVAL", 0.2)) WATCHDOG_POLL_REPEATS = int(os.getenv("WRAPYFI_ZENOH_MAX_REPEATS", -1)) @@ -41,8 +43,17 @@ class ZenohListener(Listener): Merges listener-specific settings and environment configurations, and awaits connection. """ - def __init__(self, name: str, in_topic: str, should_wait: bool = True, - ip: str = ZENOH_IP, port: int = ZENOH_PORT, mode: str = ZENOH_MODE, zenoh_kwargs: Optional[dict] = None, **kwargs): + def __init__( + self, + name: str, + in_topic: str, + should_wait: bool = True, + ip: str = ZENOH_IP, + port: int = ZENOH_PORT, + mode: str = ZENOH_MODE, + zenoh_kwargs: Optional[dict] = None, + **kwargs, + ): """ Initializes the Zenoh listener with environment or parameter-based configurations and waits for connection if specified. @@ -64,13 +75,27 @@ def __init__(self, name: str, in_topic: str, should_wait: bool = True, # Prepare Zenoh configuration from environment variables and kwargs self.zenoh_config = { "mode": mode, - "connect/endpoints": ZENOH_CONNECT if isinstance(ZENOH_CONNECT, list) else ZENOH_CONNECT.split(",") if isinstance(ZENOH_CONNECT, str) else [f"tcp/{ip}:{port}"], - **(zenoh_kwargs or {}) + "connect/endpoints": ( + ZENOH_CONNECT + if isinstance(ZENOH_CONNECT, list) + else ( + ZENOH_CONNECT.split(",") + if isinstance(ZENOH_CONNECT, str) + else [f"tcp/{ip}:{port}"] + ) + ), + **(zenoh_kwargs or {}), } if ZENOH_LISTEN: - self.zenoh_config["listen/endpoints"] = ZENOH_LISTEN if isinstance(ZENOH_LISTEN, list) else ZENOH_LISTEN.split(",") + self.zenoh_config["listen/endpoints"] = ( + ZENOH_LISTEN + if isinstance(ZENOH_LISTEN, list) + else ZENOH_LISTEN.split(",") + ) - ZenohMiddlewarePubSub.activate(config=self._prepare_config(self.zenoh_config), **kwargs) + ZenohMiddlewarePubSub.activate( + config=self._prepare_config(self.zenoh_config), **kwargs + ) self.established = False @@ -81,12 +106,18 @@ def _prepare_config(self, zenoh_kwargs): :param zenoh_kwargs: dict: Configuration parameters :return: zenoh.Config: Configured Zenoh session """ - config = zenoh.Config().from_file(ZENOH_CONFIG_FILEPATH) if ZENOH_CONFIG_FILEPATH else zenoh.Config() + config = ( + zenoh.Config().from_file(ZENOH_CONFIG_FILEPATH) + if ZENOH_CONFIG_FILEPATH + else zenoh.Config() + ) for key, value in zenoh_kwargs.items(): config.insert_json5(key, json.dumps(value)) return config - def await_connection(self, in_topic: Optional[str] = None, repeats: int = WATCHDOG_POLL_REPEATS): + def await_connection( + self, in_topic: Optional[str] = None, repeats: int = WATCHDOG_POLL_REPEATS + ): """ Waits for the Zenoh connection to be established. @@ -114,7 +145,9 @@ def establish(self, repeats: Optional[int] = None, **kwargs): established = self.await_connection(repeats=repeats) established = self.check_establishment(established) if established: - ZenohMiddlewarePubSub._instance.register_callback(self.in_topic, self.on_message) + ZenohMiddlewarePubSub._instance.register_callback( + self.in_topic, self.on_message + ) return established def close(self): @@ -137,7 +170,14 @@ class ZenohNativeObjectListener(ZenohListener): :param deserializer_kwargs: dict: Keyword arguments for the JSON deserializer """ - def __init__(self, name: str, in_topic: str, should_wait: bool = True, deserializer_kwargs: Optional[dict] = None, **kwargs): + def __init__( + self, + name: str, + in_topic: str, + should_wait: bool = True, + deserializer_kwargs: Optional[dict] = None, + **kwargs, + ): super().__init__(name, in_topic, should_wait=should_wait, **kwargs) self._plugin_decoder_hook = JsonDecodeHook(**kwargs).object_hook self._message_queue = queue.Queue() @@ -150,7 +190,11 @@ def on_message(self, sample): :param sample: zenoh.Sample: The Zenoh sample received """ try: - obj = json.loads(sample.payload.to_bytes(), object_hook=self._plugin_decoder_hook, **self._deserializer_kwargs) + obj = json.loads( + sample.payload.to_bytes(), + object_hook=self._plugin_decoder_hook, + **self._deserializer_kwargs, + ) self._message_queue.put(obj) logging.debug(f"Queued message for topic {self.in_topic}: {obj}") except json.JSONDecodeError as e: @@ -188,7 +232,17 @@ class ZenohImageListener(ZenohNativeObjectListener): :param jpg: bool: True if the image is JPEG-compressed """ - def __init__(self, name: str, in_topic: str, should_wait: bool = True, width: int = -1, height: int = -1, rgb: bool = True, jpg: bool = False, **kwargs): + def __init__( + self, + name: str, + in_topic: str, + should_wait: bool = True, + width: int = -1, + height: int = -1, + rgb: bool = True, + jpg: bool = False, + **kwargs, + ): super().__init__(name, in_topic, should_wait=should_wait, **kwargs) self.width = width self.height = height @@ -205,15 +259,21 @@ def on_message(self, sample): try: # Split payload into header and image data payload = sample.payload.to_bytes() - header_bytes, img_bytes = payload.split(b'\n', 1) # Split at the first newline + header_bytes, img_bytes = payload.split( + b"\n", 1 + ) # Split at the first newline - header = json.loads(header_bytes.decode('utf-8')) + header = json.loads(header_bytes.decode("utf-8")) np_data = np.frombuffer(img_bytes, dtype=np.uint8) if self.jpg: - img = cv2.imdecode(np_data, cv2.IMREAD_COLOR if self.rgb else cv2.IMREAD_GRAYSCALE) + img = cv2.imdecode( + np_data, cv2.IMREAD_COLOR if self.rgb else cv2.IMREAD_GRAYSCALE + ) else: - shape = header.get("shape", (self.height, self.width, 3 if self.rgb else 1)) + shape = header.get( + "shape", (self.height, self.width, 3 if self.rgb else 1) + ) img = np_data.reshape(shape) # Place the decoded image into the message queue @@ -252,7 +312,16 @@ class ZenohAudioChunkListener(ZenohNativeObjectListener): :param chunk: int: Number of samples in the audio chunk """ - def __init__(self, name: str, in_topic: str, should_wait: bool = True, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs): + def __init__( + self, + name: str, + in_topic: str, + should_wait: bool = True, + channels: int = 1, + rate: int = 44100, + chunk: int = -1, + **kwargs, + ): super().__init__(name, in_topic, should_wait=should_wait, **kwargs) self.channels = channels self.rate = rate @@ -267,8 +336,8 @@ def on_message(self, sample): """ try: payload = sample.payload.to_bytes() - header_bytes, aud_bytes = payload.split(b'\n', 1) - header = json.loads(header_bytes.decode('utf-8')) + header_bytes, aud_bytes = payload.split(b"\n", 1) + header = json.loads(header_bytes.decode("utf-8")) shape = header.get("shape") rate = header.get("rate") aud_array = np.frombuffer(aud_bytes, dtype=np.float32).reshape(shape) @@ -291,4 +360,3 @@ def listen(self): return self._message_queue.get(block=self.should_wait) except queue.Empty: return None, self.rate - diff --git a/wrapyfi/middlewares/zenoh.py b/wrapyfi/middlewares/zenoh.py index 9f2f6d5..4f86fe5 100644 --- a/wrapyfi/middlewares/zenoh.py +++ b/wrapyfi/middlewares/zenoh.py @@ -32,7 +32,9 @@ def activate(config: zenoh.Config = None, **kwargs): """ zenoh.init_log_from_env_or("error") if ZenohMiddlewarePubSub._instance is None: - ZenohMiddlewarePubSub._instance = ZenohMiddlewarePubSub(config=config, **kwargs) + ZenohMiddlewarePubSub._instance = ZenohMiddlewarePubSub( + config=config, **kwargs + ) return ZenohMiddlewarePubSub._instance def __init__(self, config: zenoh.Config = None, **kwargs): @@ -75,7 +77,6 @@ def register_callback(self, topic: str, callback): self.subscribers[topic] = self.session.declare_subscriber(topic, callback) logging.info(f"[ZenohMiddlewarePubSub] Registered callback for topic {topic}") - def is_connected(self) -> bool: """ Checks if the Zenoh session is active. @@ -90,4 +91,3 @@ def deinit(self): """ logging.info("[ZenohMiddlewarePubSub] Closing Zenoh session") self.session.close() - diff --git a/wrapyfi/publishers/zenoh.py b/wrapyfi/publishers/zenoh.py index 6eb23b9..92a71b4 100644 --- a/wrapyfi/publishers/zenoh.py +++ b/wrapyfi/publishers/zenoh.py @@ -21,7 +21,9 @@ ZENOH_CONNECT = json.loads(os.getenv("WRAPYFI_ZENOH_CONNECT", "[]")) ZENOH_LISTEN = json.loads(os.getenv("WRAPYFI_ZENOH_LISTEN", "[]")) ZENOH_CONFIG_FILEPATH = os.getenv("WRAPYFI_ZENOH_CONFIG_FILEPATH", None) -ZENOH_MONITOR_LISTENER_SPAWN = os.getenv("WRAPYFI_ZENOH_MONITOR_LISTENER_SPAWN", "thread") +ZENOH_MONITOR_LISTENER_SPAWN = os.getenv( + "WRAPYFI_ZENOH_MONITOR_LISTENER_SPAWN", "thread" +) WATCHDOG_POLL_INTERVAL = float(os.getenv("WRAPYFI_ZENOH_RETRY_INTERVAL", 0.2)) WATCHDOG_POLL_REPEATS = int(os.getenv("WRAPYFI_ZENOH_MAX_REPEATS", -1)) @@ -71,13 +73,27 @@ def __init__( # Prepare Zenoh configuration self.zenoh_config = { "mode": mode, - "connect/endpoints": ZENOH_CONNECT if isinstance(ZENOH_CONNECT, list) else ZENOH_CONNECT.split(",") if isinstance(ZENOH_CONNECT, str) else [f"tcp/{ip}:{port}"], - **(zenoh_kwargs or {}) + "connect/endpoints": ( + ZENOH_CONNECT + if isinstance(ZENOH_CONNECT, list) + else ( + ZENOH_CONNECT.split(",") + if isinstance(ZENOH_CONNECT, str) + else [f"tcp/{ip}:{port}"] + ) + ), + **(zenoh_kwargs or {}), } if ZENOH_LISTEN: - self.zenoh_config["listen/endpoints"] = ZENOH_LISTEN if isinstance(ZENOH_LISTEN, list) else ZENOH_LISTEN.split(",") - - ZenohMiddlewarePubSub.activate(config=self._prepare_config(self.zenoh_config), **kwargs) + self.zenoh_config["listen/endpoints"] = ( + ZENOH_LISTEN + if isinstance(ZENOH_LISTEN, list) + else ZENOH_LISTEN.split(",") + ) + + ZenohMiddlewarePubSub.activate( + config=self._prepare_config(self.zenoh_config), **kwargs + ) def _prepare_config(self, zenoh_kwargs): """ @@ -86,12 +102,18 @@ def _prepare_config(self, zenoh_kwargs): :param zenoh_kwargs: dict: Configuration parameters :return: zenoh.Config: Configured Zenoh session """ - config = zenoh.Config().from_file(ZENOH_CONFIG_FILEPATH) if ZENOH_CONFIG_FILEPATH else zenoh.Config() + config = ( + zenoh.Config().from_file(ZENOH_CONFIG_FILEPATH) + if ZENOH_CONFIG_FILEPATH + else zenoh.Config() + ) for key, value in zenoh_kwargs.items(): config.insert_json5(key, json.dumps(value)) return config - def await_connection(self, out_topic: Optional[str] = None, repeats: Optional[int] = None): + def await_connection( + self, out_topic: Optional[str] = None, repeats: Optional[int] = None + ): """ Wait for the connection to be established. @@ -255,12 +277,12 @@ def publish(self, img: np.ndarray): time.sleep(0.2) if ( - 0 < self.width != img.shape[1] - or 0 < self.height != img.shape[0] - or not ( + 0 < self.width != img.shape[1] + or 0 < self.height != img.shape[0] + or not ( (img.ndim == 2 and not self.rgb) or (img.ndim == 3 and self.rgb and img.shape[2] == 3) - ) + ) ): raise ValueError("Incorrect image shape for publisher") if not img.flags["C_CONTIGUOUS"]: @@ -272,10 +294,14 @@ def publish(self, img: np.ndarray): header = {"timestamp": time.time()} else: img_bytes = img.tobytes() - header = {"timestamp": time.time(), "shape": img.shape, "dtype": str(img.dtype)} + header = { + "timestamp": time.time(), + "shape": img.shape, + "dtype": str(img.dtype), + } - header_bytes = json.dumps(header).encode('utf-8') - payload = header_bytes + b'\n' + img_bytes + header_bytes = json.dumps(header).encode("utf-8") + payload = header_bytes + b"\n" + img_bytes ZenohMiddlewarePubSub._instance.session.put(self.out_topic, payload) @@ -309,7 +335,13 @@ def __init__( :param rate: int: Sampling rate. Default is 44100 :param chunk: int: Chunk size. Default is -1 (dynamic chunk size) """ - super().__init__(name, out_topic, should_wait=should_wait, multi_threaded=multi_threaded, **kwargs) + super().__init__( + name, + out_topic, + should_wait=should_wait, + multi_threaded=multi_threaded, + **kwargs, + ) self.channels = channels self.rate = rate self.chunk = chunk @@ -335,7 +367,9 @@ def publish(self, aud: Tuple[np.ndarray, int]): if 0 < self.rate != rate: raise ValueError("Incorrect audio rate for publisher") - chunk, channels = aud_array.shape if len(aud_array.shape) > 1 else (aud_array.shape[0], 1) + chunk, channels = ( + aud_array.shape if len(aud_array.shape) > 1 else (aud_array.shape[0], 1) + ) self.chunk = chunk if self.chunk == -1 else self.chunk self.channels = channels if self.channels == -1 else self.channels if 0 < self.chunk != chunk or 0 < self.channels != channels: @@ -348,11 +382,9 @@ def publish(self, aud: Tuple[np.ndarray, int]): "shape": aud_array.shape, "dtype": str(aud_array.dtype), "rate": rate, - "timestamp": time.time() + "timestamp": time.time(), } - header_bytes = json.dumps(header).encode('utf-8') - payload = header_bytes + b'\n' + aud_bytes + header_bytes = json.dumps(header).encode("utf-8") + payload = header_bytes + b"\n" + aud_bytes ZenohMiddlewarePubSub._instance.session.put(self.out_topic, payload) - -