diff --git a/aioax25/kiss.py b/aioax25/kiss.py index 64ad54c..2d2e4d6 100644 --- a/aioax25/kiss.py +++ b/aioax25/kiss.py @@ -252,6 +252,8 @@ def __init__( self._protocol = None self._rx_buffer = bytearray() self._tx_buffer = bytearray() + self._tx_queue = [] + self._tx_future = None self._loop = loop self._port = {} self._state = KISSDeviceState.CLOSED @@ -297,7 +299,7 @@ def _receive(self, data): else: self._loop.call_soon(self._receive_frame) - def _send(self, frame): + def _send(self, frame, future=None): """ Send a frame via the underlying transport. """ @@ -306,11 +308,21 @@ def _send(self, frame): if self._log.isEnabledFor(logging.DEBUG): self._log.debug("XMIT FRAME %r", b2a_hex(rawframe).decode()) + self._tx_queue.append((rawframe, future)) + self._loop.call_soon(self._send_data) + return future + + def _pick_next_tx(self): + try: + (rawframe, future) = self._tx_queue.pop(0) + except IndexError: + return + if not self._tx_buffer.endswith(bytearray([BYTE_FEND])): self._tx_buffer += bytearray([BYTE_FEND]) self._tx_buffer += bytes(rawframe) + bytearray([BYTE_FEND]) - self._loop.call_soon(self._send_data) + self._tx_future = future def _receive_frame(self): """ @@ -401,23 +413,16 @@ def _send_data(self): """ Send the next block of data waiting in the buffer. """ + if len(self._tx_buffer) == 0: + # Nothing pending to transmit + self._pick_next_tx() + data = self._tx_buffer[: self._send_block_size] - self._tx_buffer = self._tx_buffer[self._send_block_size :] if self._log.isEnabledFor(logging.DEBUG): self._log.debug("XMIT RAW %r", b2a_hex(data).decode()) - self._try_send_raw_data(data) - - # If we are closing, wait for this to be sent - if (self._state == KISSDeviceState.CLOSING) and ( - len(self._tx_buffer) == 0 - ): - self._try_close() - return - - if self._tx_buffer: - self._loop.call_later(self._send_block_delay, self._send_data) + self._try_send_raw_data(data, self._tx_future) def _init_kiss(self): assert self.state == KISSDeviceState.OPENING, "Device is not opening" @@ -468,14 +473,48 @@ def _try_open(self): self._open_queue = None raise - def _try_send_raw_data(self, data): + def _try_send_raw_data(self, data, future=None): try: self._send_raw_data(data) except: (ex_type, ex_value, ex_traceback) = exc_info() + if future: + future.set_exception(ex_value) self._on_fail("send", exc_info()) raise + # We assume it was sent + self._mark_sent(data, future) + + def _mark_sent(self, data, future): + """ + Check for the presence of the given data at the start of the buffer, + de-queue it and check for anything further. + """ + # Sanity check, ensure the data is in the transmit buffer, then + # dequeue it! + try: + assert self._tx_buffer.startswith( + data + ), "Did not find sent data in the transmit buffer!" + self._tx_buffer = self._tx_buffer[len(data) :] + except: + (ex_type, ex_value, ex_traceback) = exc_info() + if future: + future.set_exception(ex_value) + self._on_fail("send", exc_info()) + + # If we are closing, wait for this to be sent + if (self._state == KISSDeviceState.CLOSING) and ( + len(self._tx_buffer) == 0 + ): + self._try_close() + return + + # If there is more in the buffer, queue _send_data after a delay. + if self._tx_buffer: + self._loop.call_later(self._send_block_delay, self._send_data) + def _mark_closed(self, ex=None): if ex is None: # Mark the device as closed @@ -844,12 +883,14 @@ def __init__(self, device, port, log): def port(self): return self._port - def send(self, frame): + def send(self, frame, future=None): """ Send a raw AX.25 frame to the TNC via this port. """ + future = self._device._ensure_future(future) self._log.debug("XMIT AX.25 %s", frame) - self._device._send(KISSCmdData(self.port, bytes(frame))) + self._device._send(KISSCmdData(self.port, bytes(frame)), future) + return future def _receive_frame(self, frame): """ diff --git a/tests/test_kiss/test_base.py b/tests/test_kiss/test_base.py index 410300d..5f70f4a 100644 --- a/tests/test_kiss/test_base.py +++ b/tests/test_kiss/test_base.py @@ -31,6 +31,9 @@ def _close(self): def _send_raw_data(self, data): self.transmitted += data + def _ensure_future(self, future): + return future + class DummyFutureQueue(object): def __init__(self): @@ -225,8 +228,9 @@ def test_close_reset(): # Should be in the closing state assert kissdev._state == KISSDeviceState.CLOSING - # A "return from KISS" frame should be in the transmit buffer - assert bytes(kissdev._tx_buffer) == b"\xc0\xff\xc0" + # A "return from KISS" frame should be in the transmit buffer (minus FEND + # bytes) + assert kissdev._tx_queue == [(b"\xff", None)] # A call to _send_data should be pending (_, func) = loop.calls.pop() diff --git a/tests/test_kiss/test_port.py b/tests/test_kiss/test_port.py index 44b7161..e19f68b 100644 --- a/tests/test_kiss/test_port.py +++ b/tests/test_kiss/test_port.py @@ -12,8 +12,11 @@ class DummyKISSDevice(object): def __init__(self): self.sent = [] - def _send(self, frame): - self.sent.append(frame) + def _send(self, frame, future): + self.sent.append((frame, future)) + + def _ensure_future(self, future): + return future def test_send(): @@ -25,8 +28,9 @@ def test_send(): port.send(b"this is a test frame") assert len(dev.sent) == 1 - last = dev.sent.pop(0) + (last, last_future) = dev.sent.pop(0) + assert last_future is None assert isinstance(last, KISSCmdData) assert last.port == 5 assert last.payload == b"this is a test frame"