Skip to content

Commit

Permalink
kiss: Add future argument to send()
Browse files Browse the repository at this point in the history
Allow the user to pass in a `Future` (or if `return_future` is set to
`True`, we create one) and have it resolved or rejected when the frame
is sent (or if sending fails).
  • Loading branch information
sjlongland committed May 26, 2024
1 parent 1f8e739 commit e3c388c
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 22 deletions.
75 changes: 58 additions & 17 deletions aioax25/kiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ def __init__(
self._protocol = None
self._rx_buffer = bytearray()
self._tx_buffer = bytearray()
self._tx_queue = []
self._tx_future = None
self._loop = loop
self._port = {}
self._state = KISSDeviceState.CLOSED
Expand Down Expand Up @@ -297,7 +299,7 @@ def _receive(self, data):
else:
self._loop.call_soon(self._receive_frame)

def _send(self, frame):
def _send(self, frame, future=None):
"""
Send a frame via the underlying transport.
"""
Expand All @@ -306,11 +308,21 @@ def _send(self, frame):
if self._log.isEnabledFor(logging.DEBUG):
self._log.debug("XMIT FRAME %r", b2a_hex(rawframe).decode())

self._tx_queue.append((rawframe, future))
self._loop.call_soon(self._send_data)
return future

def _pick_next_tx(self):
try:
(rawframe, future) = self._tx_queue.pop(0)
except IndexError:
return

if not self._tx_buffer.endswith(bytearray([BYTE_FEND])):
self._tx_buffer += bytearray([BYTE_FEND])

self._tx_buffer += bytes(rawframe) + bytearray([BYTE_FEND])
self._loop.call_soon(self._send_data)
self._tx_future = future

def _receive_frame(self):
"""
Expand Down Expand Up @@ -401,23 +413,16 @@ def _send_data(self):
"""
Send the next block of data waiting in the buffer.
"""
if len(self._tx_buffer) == 0:
# Nothing pending to transmit
self._pick_next_tx()

data = self._tx_buffer[: self._send_block_size]
self._tx_buffer = self._tx_buffer[self._send_block_size :]

if self._log.isEnabledFor(logging.DEBUG):
self._log.debug("XMIT RAW %r", b2a_hex(data).decode())

self._try_send_raw_data(data)

# If we are closing, wait for this to be sent
if (self._state == KISSDeviceState.CLOSING) and (
len(self._tx_buffer) == 0
):
self._try_close()
return

if self._tx_buffer:
self._loop.call_later(self._send_block_delay, self._send_data)
self._try_send_raw_data(data, self._tx_future)

def _init_kiss(self):
assert self.state == KISSDeviceState.OPENING, "Device is not opening"
Expand Down Expand Up @@ -468,14 +473,48 @@ def _try_open(self):
self._open_queue = None
raise

def _try_send_raw_data(self, data):
def _try_send_raw_data(self, data, future=None):
try:
self._send_raw_data(data)
except:
(ex_type, ex_value, ex_traceback) = exc_info()
if future:
future.set_exception(ex_value)
self._on_fail("send", exc_info())
raise

# We assume it was sent
self._mark_sent(data, future)

def _mark_sent(self, data, future):
"""
Check for the presence of the given data at the start of the buffer,
de-queue it and check for anything further.
"""
# Sanity check, ensure the data is in the transmit buffer, then
# dequeue it!
try:
assert self._tx_buffer.startswith(
data
), "Did not find sent data in the transmit buffer!"
self._tx_buffer = self._tx_buffer[len(data) :]
except:
(ex_type, ex_value, ex_traceback) = exc_info()
if future:
future.set_exception(ex_value)
self._on_fail("send", exc_info())

# If we are closing, wait for this to be sent
if (self._state == KISSDeviceState.CLOSING) and (
len(self._tx_buffer) == 0
):
self._try_close()
return

# If there is more in the buffer, queue _send_data after a delay.
if self._tx_buffer:
self._loop.call_later(self._send_block_delay, self._send_data)

def _mark_closed(self, ex=None):
if ex is None:
# Mark the device as closed
Expand Down Expand Up @@ -844,12 +883,14 @@ def __init__(self, device, port, log):
def port(self):
return self._port

def send(self, frame):
def send(self, frame, future=None):
"""
Send a raw AX.25 frame to the TNC via this port.
"""
future = self._device._ensure_future(future)
self._log.debug("XMIT AX.25 %s", frame)
self._device._send(KISSCmdData(self.port, bytes(frame)))
self._device._send(KISSCmdData(self.port, bytes(frame)), future)
return future

def _receive_frame(self, frame):
"""
Expand Down
8 changes: 6 additions & 2 deletions tests/test_kiss/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def _close(self):
def _send_raw_data(self, data):
self.transmitted += data

def _ensure_future(self, future):
return future


class DummyFutureQueue(object):
def __init__(self):
Expand Down Expand Up @@ -225,8 +228,9 @@ def test_close_reset():
# Should be in the closing state
assert kissdev._state == KISSDeviceState.CLOSING

# A "return from KISS" frame should be in the transmit buffer
assert bytes(kissdev._tx_buffer) == b"\xc0\xff\xc0"
# A "return from KISS" frame should be in the transmit buffer (minus FEND
# bytes)
assert kissdev._tx_queue == [(b"\xff", None)]

# A call to _send_data should be pending
(_, func) = loop.calls.pop()
Expand Down
10 changes: 7 additions & 3 deletions tests/test_kiss/test_port.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ class DummyKISSDevice(object):
def __init__(self):
self.sent = []

def _send(self, frame):
self.sent.append(frame)
def _send(self, frame, future):
self.sent.append((frame, future))

def _ensure_future(self, future):
return future


def test_send():
Expand All @@ -25,8 +28,9 @@ def test_send():
port.send(b"this is a test frame")

assert len(dev.sent) == 1
last = dev.sent.pop(0)
(last, last_future) = dev.sent.pop(0)

assert last_future is None
assert isinstance(last, KISSCmdData)
assert last.port == 5
assert last.payload == b"this is a test frame"
Expand Down

0 comments on commit e3c388c

Please sign in to comment.