diff --git a/README.md b/README.md index ad96070..4dc1b8f 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ PyPI - Version - +

diff --git a/scaler/about.py b/scaler/about.py index f7d787d..cd12fb0 100644 --- a/scaler/about.py +++ b/scaler/about.py @@ -1 +1 @@ -__version__ = "1.8.7" +__version__ = "1.8.8" diff --git a/scaler/io/async_binder.py b/scaler/io/async_binder.py index 68d5e1a..4132c5f 100644 --- a/scaler/io/async_binder.py +++ b/scaler/io/async_binder.py @@ -5,6 +5,7 @@ from typing import Awaitable, Callable, List, Optional, Dict import zmq.asyncio +from zmq import Frame from scaler.io.utility import deserialize, serialize from scaler.protocol.python.mixins import Message @@ -38,18 +39,18 @@ def register(self, callback: Callable[[bytes, Message], Awaitable[None]]): self._callback = callback async def routine(self): - frames = await self._socket.recv_multipart() + frames: List[Frame] = await self._socket.recv_multipart(copy=False) if not self.__is_valid_message(frames): return source, payload = frames - message: Optional[Message] = deserialize(payload) + message: Optional[Message] = deserialize(payload.bytes) if message is None: - logging.error(f"received unknown message from {source!r}: {payload!r}") + logging.error(f"received unknown message from {source.bytes!r}: {payload!r}") return self.__count_received(message.__class__.__name__) - await self._callback(source, message) + await self._callback(source.bytes, message) async def send(self, to: bytes, message: Message): self.__count_sent(message.__class__.__name__) @@ -63,7 +64,7 @@ def __set_socket_options(self): self._socket.setsockopt(zmq.SNDHWM, 0) self._socket.setsockopt(zmq.RCVHWM, 0) - def __is_valid_message(self, frames: List[bytes]) -> bool: + def __is_valid_message(self, frames: List[Frame]) -> bool: if len(frames) < 2: logging.error(f"{self.__get_prefix()} received unexpected frames {frames}") return False diff --git a/scaler/io/async_connector.py b/scaler/io/async_connector.py index 86be509..a9a02c4 100644 --- a/scaler/io/async_connector.py +++ b/scaler/io/async_connector.py @@ -82,10 +82,10 @@ async def receive(self) -> Optional[Message]: if self._socket.closed: return None - payload = await self._socket.recv() - result: Optional[Message] = deserialize(payload) + payload = await self._socket.recv(copy=False) + result: Optional[Message] = deserialize(payload.bytes) if result is None: - logging.error(f"received unknown message: {payload!r}") + logging.error(f"received unknown message: {payload.bytes!r}") return None return result diff --git a/scaler/io/sync_connector.py b/scaler/io/sync_connector.py index a45b6a5..b3c9cb8 100644 --- a/scaler/io/sync_connector.py +++ b/scaler/io/sync_connector.py @@ -47,13 +47,13 @@ def identity(self) -> bytes: def send(self, message: Message): with self._lock: - self._socket.send(serialize(message)) + self._socket.send(serialize(message), copy=False) def receive(self) -> Optional[Message]: with self._lock: - payload = self._socket.recv() + payload = self._socket.recv(copy=False) - return self.__compose_message(payload) + return self.__compose_message(payload.bytes) def __compose_message(self, payload: bytes) -> Optional[Message]: result: Optional[Message] = deserialize(payload) diff --git a/scaler/io/sync_subscriber.py b/scaler/io/sync_subscriber.py index 5423f2c..17bc5b7 100644 --- a/scaler/io/sync_subscriber.py +++ b/scaler/io/sync_subscriber.py @@ -69,7 +69,7 @@ def __initialize(self): def __routine_polling(self): try: - self.__routine_receive(self._socket.recv()) + self.__routine_receive(self._socket.recv(copy=False).bytes) except zmq.Again: raise TimeoutError(f"Cannot connect to {self._address.to_address()} in {self._timeout_seconds} seconds")