Skip to content

Commit

Permalink
Automated version bump and other updates
Browse files Browse the repository at this point in the history
  • Loading branch information
actions-user committed Nov 1, 2024
1 parent c3ed213 commit 32f1fe3
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 48 deletions.
8 changes: 8 additions & 0 deletions docs/source/wrapyfi.listeners.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ wrapyfi.listeners.yarp module
:undoc-members:
:show-inheritance:

wrapyfi.listeners.zenoh module
------------------------------

.. automodule:: wrapyfi.listeners.zenoh
:members:
:undoc-members:
:show-inheritance:

wrapyfi.listeners.zeromq module
-------------------------------

Expand Down
8 changes: 8 additions & 0 deletions docs/source/wrapyfi.middlewares.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ wrapyfi.middlewares.yarp module
:undoc-members:
:show-inheritance:

wrapyfi.middlewares.zenoh module
--------------------------------

.. automodule:: wrapyfi.middlewares.zenoh
:members:
:undoc-members:
:show-inheritance:

wrapyfi.middlewares.zeromq module
---------------------------------

Expand Down
8 changes: 8 additions & 0 deletions docs/source/wrapyfi.publishers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ wrapyfi.publishers.yarp module
:undoc-members:
:show-inheritance:

wrapyfi.publishers.zenoh module
-------------------------------

.. automodule:: wrapyfi.publishers.zenoh
:members:
:undoc-members:
:show-inheritance:

wrapyfi.publishers.zeromq module
--------------------------------

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

[project]
name = "wrapyfi"
version = "0.4.50"
version = "0.4.51"
description = "Wrapyfi is a wrapper for simplifying Middleware communication"
readme = "README.md"
authors = [
Expand Down
13 changes: 12 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,18 @@ def check_cv2(default_python="opencv-python"):
+ check_cv2("opencv-python-headless"),
"headless_mqtt": ["wrapyfi[mqtt]", "wrapyfi[numpy]"]
+ check_cv2("opencv-python-headless"),
"complete": ["wrapyfi[numpy]", "sounddevice", "soundfile", "Pillow", "pandas", "wrapyfi[pyzmq]", "wrapyfi[websocket]", "wrapyfi[zenoh]", "wrapyfi[mqtt]", "wrapyfi[docs]"]
"complete": [
"wrapyfi[numpy]",
"sounddevice",
"soundfile",
"Pillow",
"pandas",
"wrapyfi[pyzmq]",
"wrapyfi[websocket]",
"wrapyfi[zenoh]",
"wrapyfi[mqtt]",
"wrapyfi[docs]",
]
+ check_cv2("opencv-contrib-python"),
"all": ["wrapyfi[numpy]", "wrapyfi[pyzmq]"]
+ check_cv2("opencv-contrib-python"),
Expand Down
110 changes: 89 additions & 21 deletions wrapyfi/listeners/zenoh.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
ZENOH_CONNECT = json.loads(os.getenv("WRAPYFI_ZENOH_CONNECT", "[]"))
ZENOH_LISTEN = json.loads(os.getenv("WRAPYFI_ZENOH_LISTEN", "[]"))
ZENOH_CONFIG_FILEPATH = os.getenv("WRAPYFI_ZENOH_CONFIG_FILEPATH", None)
ZENOH_MONITOR_LISTENER_SPAWN = os.getenv("WRAPYFI_ZENOH_MONITOR_LISTENER_SPAWN", "thread")
ZENOH_MONITOR_LISTENER_SPAWN = os.getenv(
"WRAPYFI_ZENOH_MONITOR_LISTENER_SPAWN", "thread"
)

WATCHDOG_POLL_INTERVAL = float(os.getenv("WRAPYFI_ZENOH_RETRY_INTERVAL", 0.2))
WATCHDOG_POLL_REPEATS = int(os.getenv("WRAPYFI_ZENOH_MAX_REPEATS", -1))
Expand All @@ -41,8 +43,17 @@ class ZenohListener(Listener):
Merges listener-specific settings and environment configurations, and awaits connection.
"""

def __init__(self, name: str, in_topic: str, should_wait: bool = True,
ip: str = ZENOH_IP, port: int = ZENOH_PORT, mode: str = ZENOH_MODE, zenoh_kwargs: Optional[dict] = None, **kwargs):
def __init__(
self,
name: str,
in_topic: str,
should_wait: bool = True,
ip: str = ZENOH_IP,
port: int = ZENOH_PORT,
mode: str = ZENOH_MODE,
zenoh_kwargs: Optional[dict] = None,
**kwargs,
):
"""
Initializes the Zenoh listener with environment or parameter-based configurations
and waits for connection if specified.
Expand All @@ -64,13 +75,27 @@ def __init__(self, name: str, in_topic: str, should_wait: bool = True,
# Prepare Zenoh configuration from environment variables and kwargs
self.zenoh_config = {
"mode": mode,
"connect/endpoints": ZENOH_CONNECT if isinstance(ZENOH_CONNECT, list) else ZENOH_CONNECT.split(",") if isinstance(ZENOH_CONNECT, str) else [f"tcp/{ip}:{port}"],
**(zenoh_kwargs or {})
"connect/endpoints": (
ZENOH_CONNECT
if isinstance(ZENOH_CONNECT, list)
else (
ZENOH_CONNECT.split(",")
if isinstance(ZENOH_CONNECT, str)
else [f"tcp/{ip}:{port}"]
)
),
**(zenoh_kwargs or {}),
}
if ZENOH_LISTEN:
self.zenoh_config["listen/endpoints"] = ZENOH_LISTEN if isinstance(ZENOH_LISTEN, list) else ZENOH_LISTEN.split(",")
self.zenoh_config["listen/endpoints"] = (
ZENOH_LISTEN
if isinstance(ZENOH_LISTEN, list)
else ZENOH_LISTEN.split(",")
)

ZenohMiddlewarePubSub.activate(config=self._prepare_config(self.zenoh_config), **kwargs)
ZenohMiddlewarePubSub.activate(
config=self._prepare_config(self.zenoh_config), **kwargs
)

self.established = False

Expand All @@ -81,12 +106,18 @@ def _prepare_config(self, zenoh_kwargs):
:param zenoh_kwargs: dict: Configuration parameters
:return: zenoh.Config: Configured Zenoh session
"""
config = zenoh.Config().from_file(ZENOH_CONFIG_FILEPATH) if ZENOH_CONFIG_FILEPATH else zenoh.Config()
config = (
zenoh.Config().from_file(ZENOH_CONFIG_FILEPATH)
if ZENOH_CONFIG_FILEPATH
else zenoh.Config()
)
for key, value in zenoh_kwargs.items():
config.insert_json5(key, json.dumps(value))
return config

def await_connection(self, in_topic: Optional[str] = None, repeats: int = WATCHDOG_POLL_REPEATS):
def await_connection(
self, in_topic: Optional[str] = None, repeats: int = WATCHDOG_POLL_REPEATS
):
"""
Waits for the Zenoh connection to be established.
Expand Down Expand Up @@ -114,7 +145,9 @@ def establish(self, repeats: Optional[int] = None, **kwargs):
established = self.await_connection(repeats=repeats)
established = self.check_establishment(established)
if established:
ZenohMiddlewarePubSub._instance.register_callback(self.in_topic, self.on_message)
ZenohMiddlewarePubSub._instance.register_callback(
self.in_topic, self.on_message
)
return established

def close(self):
Expand All @@ -137,7 +170,14 @@ class ZenohNativeObjectListener(ZenohListener):
:param deserializer_kwargs: dict: Keyword arguments for the JSON deserializer
"""

def __init__(self, name: str, in_topic: str, should_wait: bool = True, deserializer_kwargs: Optional[dict] = None, **kwargs):
def __init__(
self,
name: str,
in_topic: str,
should_wait: bool = True,
deserializer_kwargs: Optional[dict] = None,
**kwargs,
):
super().__init__(name, in_topic, should_wait=should_wait, **kwargs)
self._plugin_decoder_hook = JsonDecodeHook(**kwargs).object_hook
self._message_queue = queue.Queue()
Expand All @@ -150,7 +190,11 @@ def on_message(self, sample):
:param sample: zenoh.Sample: The Zenoh sample received
"""
try:
obj = json.loads(sample.payload.to_bytes(), object_hook=self._plugin_decoder_hook, **self._deserializer_kwargs)
obj = json.loads(
sample.payload.to_bytes(),
object_hook=self._plugin_decoder_hook,
**self._deserializer_kwargs,
)
self._message_queue.put(obj)
logging.debug(f"Queued message for topic {self.in_topic}: {obj}")
except json.JSONDecodeError as e:
Expand Down Expand Up @@ -188,7 +232,17 @@ class ZenohImageListener(ZenohNativeObjectListener):
:param jpg: bool: True if the image is JPEG-compressed
"""

def __init__(self, name: str, in_topic: str, should_wait: bool = True, width: int = -1, height: int = -1, rgb: bool = True, jpg: bool = False, **kwargs):
def __init__(
self,
name: str,
in_topic: str,
should_wait: bool = True,
width: int = -1,
height: int = -1,
rgb: bool = True,
jpg: bool = False,
**kwargs,
):
super().__init__(name, in_topic, should_wait=should_wait, **kwargs)
self.width = width
self.height = height
Expand All @@ -205,15 +259,21 @@ def on_message(self, sample):
try:
# Split payload into header and image data
payload = sample.payload.to_bytes()
header_bytes, img_bytes = payload.split(b'\n', 1) # Split at the first newline
header_bytes, img_bytes = payload.split(
b"\n", 1
) # Split at the first newline

header = json.loads(header_bytes.decode('utf-8'))
header = json.loads(header_bytes.decode("utf-8"))
np_data = np.frombuffer(img_bytes, dtype=np.uint8)

if self.jpg:
img = cv2.imdecode(np_data, cv2.IMREAD_COLOR if self.rgb else cv2.IMREAD_GRAYSCALE)
img = cv2.imdecode(
np_data, cv2.IMREAD_COLOR if self.rgb else cv2.IMREAD_GRAYSCALE
)
else:
shape = header.get("shape", (self.height, self.width, 3 if self.rgb else 1))
shape = header.get(
"shape", (self.height, self.width, 3 if self.rgb else 1)
)
img = np_data.reshape(shape)

# Place the decoded image into the message queue
Expand Down Expand Up @@ -252,7 +312,16 @@ class ZenohAudioChunkListener(ZenohNativeObjectListener):
:param chunk: int: Number of samples in the audio chunk
"""

def __init__(self, name: str, in_topic: str, should_wait: bool = True, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs):
def __init__(
self,
name: str,
in_topic: str,
should_wait: bool = True,
channels: int = 1,
rate: int = 44100,
chunk: int = -1,
**kwargs,
):
super().__init__(name, in_topic, should_wait=should_wait, **kwargs)
self.channels = channels
self.rate = rate
Expand All @@ -267,8 +336,8 @@ def on_message(self, sample):
"""
try:
payload = sample.payload.to_bytes()
header_bytes, aud_bytes = payload.split(b'\n', 1)
header = json.loads(header_bytes.decode('utf-8'))
header_bytes, aud_bytes = payload.split(b"\n", 1)
header = json.loads(header_bytes.decode("utf-8"))
shape = header.get("shape")
rate = header.get("rate")
aud_array = np.frombuffer(aud_bytes, dtype=np.float32).reshape(shape)
Expand All @@ -291,4 +360,3 @@ def listen(self):
return self._message_queue.get(block=self.should_wait)
except queue.Empty:
return None, self.rate

6 changes: 3 additions & 3 deletions wrapyfi/middlewares/zenoh.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ def activate(config: zenoh.Config = None, **kwargs):
"""
zenoh.init_log_from_env_or("error")
if ZenohMiddlewarePubSub._instance is None:
ZenohMiddlewarePubSub._instance = ZenohMiddlewarePubSub(config=config, **kwargs)
ZenohMiddlewarePubSub._instance = ZenohMiddlewarePubSub(
config=config, **kwargs
)
return ZenohMiddlewarePubSub._instance

def __init__(self, config: zenoh.Config = None, **kwargs):
Expand Down Expand Up @@ -75,7 +77,6 @@ def register_callback(self, topic: str, callback):
self.subscribers[topic] = self.session.declare_subscriber(topic, callback)
logging.info(f"[ZenohMiddlewarePubSub] Registered callback for topic {topic}")


def is_connected(self) -> bool:
"""
Checks if the Zenoh session is active.
Expand All @@ -90,4 +91,3 @@ def deinit(self):
"""
logging.info("[ZenohMiddlewarePubSub] Closing Zenoh session")
self.session.close()

Loading

0 comments on commit 32f1fe3

Please sign in to comment.