diff --git a/aioax25/peer.py b/aioax25/peer.py index 7469d43..5d7e30b 100644 --- a/aioax25/peer.py +++ b/aioax25/peer.py @@ -570,6 +570,23 @@ def _on_receive(self, frame): self.received_frame.emit(frame=frame, peer=self) return self._send_dm() + def _on_receive_isframe_nr_ns(self, frame): + """ + Handle the N(R) / N(S) fields from an I or S frame from the peer. + """ + # "Whenever an I or S frame is correctly received, even in a busy + # condition, the N(R) of the received frame should be checked to see + # if it includes an acknowledgement of outstanding sent I frames. The + # T1 timer should be cancelled if the received frame actually + # acknowledges previously unacknowledged frames. If the T1 timer is + # cancelled and there are still some frames that have been sent that + # are not acknowledged, T1 should be started again. If the T1 timer + # runs out before an acknowledgement is received, the device should + # proceed to the retransmission procedure in 2.4.4.9." + + # Check N(R) for received frames. + self._ack_outstanding((frame.nr - 1) % self._modulo) + def _on_receive_iframe(self, frame): """ Handle an incoming I-frame @@ -642,23 +659,9 @@ def _on_receive_sframe(self, frame): self._on_receive_rej(frame) elif isinstance(frame, self._SREJFrameClass): self._on_receive_srej(frame) - - def _on_receive_isframe_nr_ns(self, frame): - """ - Handle the N(R) / N(S) fields from an I or S frame from the peer. - """ - # "Whenever an I or S frame is correctly received, even in a busy - # condition, the N(R) of the received frame should be checked to see - # if it includes an acknowledgement of outstanding sent I frames. The - # T1 timer should be cancelled if the received frame actually - # acknowledges previously unacknowledged frames. If the T1 timer is - # cancelled and there are still some frames that have been sent that - # are not acknowledged, T1 should be started again. If the T1 timer - # runs out before an acknowledgement is received, the device should - # proceed to the retransmission procedure in 2.4.4.9." - - # Check N(R) for received frames. - self._ack_outstanding((frame.nr - 1) % self._modulo) + else: # pragma: no cover + # Should be impossible to get here! + raise TypeError("Unhandled frame: %r" % frame) def _on_receive_rr(self, frame): if frame.pf: @@ -670,9 +673,6 @@ def _on_receive_rr(self, frame): self._log.debug( "RR notification received from peer N(R)=%d", frame.nr ) - # AX.25 sect 4.3.2.1: "acknowledges properly received - # I frames up to and including N(R)-1" - self._ack_outstanding((frame.nr - 1) % self._modulo) self._peer_busy = False self._send_next_iframe() @@ -684,8 +684,6 @@ def _on_receive_rnr(self, frame): else: # Received peer's RNR status, peer is busy self._log.debug("RNR notification received from peer") - # AX.25 sect 4.3.2.2: "Frames up to N(R)-1 are acknowledged." - self._ack_outstanding((frame.nr - 1) % self._modulo) self._peer_busy = True def _on_receive_rej(self, frame): @@ -696,9 +694,6 @@ def _on_receive_rej(self, frame): else: # Reject reject. self._log.debug("REJ notification received from peer") - # AX.25 sect 4.3.2.3: "Any frames sent with a sequence number - # of N(R)-1 or less are acknowledged." - self._ack_outstanding((frame.nr - 1) % self._modulo) # AX.25 2.2 section 6.4.7 says we set V(S) to this frame's # N(R) and begin re-transmission. self._log.debug("Set state V(S) from frame N(R) = %d", frame.nr) @@ -713,7 +708,8 @@ def _on_receive_srej(self, frame): # '1', then I frames numbered up to N(R)-1 inclusive are considered # as acknowledged." self._log.debug("SREJ received with P/F=1") - self._ack_outstanding((frame.nr - 1) % self._modulo) + # TODO: but we always ACK up to N(R)-1 on receipt of a S-frame? + # What if the P/F bit is 0? # Re-send the outstanding frame self._log.debug("Re-sending I-frame %d due to SREJ", frame.nr) diff --git a/tests/mocks.py b/tests/mocks.py index a60e285..93eef60 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -57,6 +57,9 @@ def warning(self, msg, *args, **kwargs): def getChild(self, name): return DummyLogger(self.name + "." + name, parent=self) + def isEnabledFor(self, level): + return True + class DummyTimeout(object): def __init__(self, delay, callback, *args, **kwargs): diff --git a/tests/test_peer/test_connection.py b/tests/test_peer/test_connection.py index a058ed2..ec858df 100644 --- a/tests/test_peer/test_connection.py +++ b/tests/test_peer/test_connection.py @@ -1106,6 +1106,590 @@ def _received_information(frame, payload, **kwargs): ] +def test_recv_sframe_rr_req_busy(): + """ + Test that RR with P/F set while busy sends RNR + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub the functions called + count = dict(send_rr=0, send_rnr=0, send_next_iframe=0) + + def _send_rr(): + count["send_rr"] += 1 + + peer._send_rr_notification = _send_rr + + def _send_rnr(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr + + def _send_next_iframe(): + count["send_next_iframe"] += 1 + + peer._send_next_iframe = _send_next_iframe + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._init_connection(False) + peer._local_busy = True + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x51", + ) + ) + + # We should send a RNR in reply + assert count == dict(send_rr=0, send_rnr=1, send_next_iframe=0) + + +def test_recv_sframe_rr_req_notbusy(): + """ + Test that RR with P/F set while not busy sends RR + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub the functions called + count = dict(send_rr=0, send_rnr=0, send_next_iframe=0) + + def _send_rr(): + count["send_rr"] += 1 + + peer._send_rr_notification = _send_rr + + def _send_rnr(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr + + def _send_next_iframe(): + count["send_next_iframe"] += 1 + + peer._send_next_iframe = _send_next_iframe + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._init_connection(False) + peer._local_busy = False + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x51", + ) + ) + + # We should send a RR in reply + assert count == dict(send_rr=1, send_rnr=0, send_next_iframe=0) + + +def test_recv_sframe_rr_rep(): + """ + Test that RR with P/F clear marks peer not busy + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub the functions called + count = dict(send_rr=0, send_rnr=0, send_next_iframe=0) + + def _send_rr(): + count["send_rr"] += 1 + + peer._send_rr_notification = _send_rr + + def _send_rnr(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr + + def _send_next_iframe(): + count["send_next_iframe"] += 1 + + peer._send_next_iframe = _send_next_iframe + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._init_connection(False) + peer._peer_busy = True + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x41", + ) + ) + + # Busy flag should be cleared + assert peer._peer_busy is False + + # We should send the next I-frame in reply + assert count == dict(send_rr=0, send_rnr=0, send_next_iframe=1) + + +def test_recv_sframe_rnr_req_busy(): + """ + Test that RNR with P/F set while busy sends RNR + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub the functions called + count = dict(send_rr=0, send_rnr=0, send_next_iframe=0) + + def _send_rr(): + count["send_rr"] += 1 + + peer._send_rr_notification = _send_rr + + def _send_rnr(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr + + def _send_next_iframe(): + count["send_next_iframe"] += 1 + + peer._send_next_iframe = _send_next_iframe + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._init_connection(False) + peer._local_busy = True + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x55", + ) + ) + + # We should send a RNR in reply + assert count == dict(send_rr=0, send_rnr=1, send_next_iframe=0) + + +def test_recv_sframe_rnr_req_notbusy(): + """ + Test that RNR with P/F set while not busy sends RR + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub the functions called + count = dict(send_rr=0, send_rnr=0, send_next_iframe=0) + + def _send_rr(): + count["send_rr"] += 1 + + peer._send_rr_notification = _send_rr + + def _send_rnr(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr + + def _send_next_iframe(): + count["send_next_iframe"] += 1 + + peer._send_next_iframe = _send_next_iframe + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._init_connection(False) + peer._local_busy = False + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x55", + ) + ) + + # We should send a RR in reply + assert count == dict(send_rr=1, send_rnr=0, send_next_iframe=0) + + +def test_recv_sframe_rnr_rep(): + """ + Test that RNR with P/F clear marks peer busy + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub the functions called + count = dict(send_rr=0, send_rnr=0, send_next_iframe=0) + + def _send_rr(): + count["send_rr"] += 1 + + peer._send_rr_notification = _send_rr + + def _send_rnr(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr + + def _send_next_iframe(): + count["send_next_iframe"] += 1 + + peer._send_next_iframe = _send_next_iframe + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._init_connection(False) + peer._peer_busy = False + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x45", + ) + ) + + # Busy flag should be set + assert peer._peer_busy is True + + +def test_recv_sframe_rej_req_busy(): + """ + Test that REJ with P/F set while busy sends RNR + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub the functions called + count = dict(send_rr=0, send_rnr=0, send_next_iframe=0) + state_updates = [] + + def _send_rr(): + count["send_rr"] += 1 + + peer._send_rr_notification = _send_rr + + def _send_rnr(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr + + def _send_next_iframe(): + count["send_next_iframe"] += 1 + + peer._send_next_iframe = _send_next_iframe + + def _update_state(prop, **kwargs): + kwargs["prop"] = prop + state_updates.append(kwargs) + setattr( + peer, + prop, + kwargs.get("value", getattr(peer, prop)) + kwargs.get("delta", 0), + ) + + peer._update_state = _update_state + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._init_connection(False) + peer._local_busy = True + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x59", + ) + ) + + # We should update due to resets and peer ACKs + assert state_updates == [ + {"comment": "reset", "prop": "_send_state", "value": 0}, + {"comment": "reset", "prop": "_send_seq", "value": 0}, + {"comment": "reset", "prop": "_recv_state", "value": 0}, + {"comment": "reset", "prop": "_recv_seq", "value": 0}, + {"comment": "ACKed by peer N(R)", "delta": 1, "prop": "_recv_seq"}, + ] + + # We should send a RNR in reply + assert count == dict(send_rr=0, send_rnr=1, send_next_iframe=0) + + +def test_recv_sframe_rej_req_notbusy(): + """ + Test that REJ with P/F set while not busy sends RR + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub the functions called + count = dict(send_rr=0, send_rnr=0, send_next_iframe=0) + state_updates = [] + + def _send_rr(): + count["send_rr"] += 1 + + peer._send_rr_notification = _send_rr + + def _send_rnr(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr + + def _send_next_iframe(): + count["send_next_iframe"] += 1 + + peer._send_next_iframe = _send_next_iframe + + def _update_state(prop, **kwargs): + kwargs["prop"] = prop + state_updates.append(kwargs) + setattr( + peer, + prop, + kwargs.get("value", getattr(peer, prop)) + kwargs.get("delta", 0), + ) + + peer._update_state = _update_state + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._init_connection(False) + peer._local_busy = False + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x59", + ) + ) + + # State updates should be a reset and peer ACK + assert state_updates == [ + {"comment": "reset", "prop": "_send_state", "value": 0}, + {"comment": "reset", "prop": "_send_seq", "value": 0}, + {"comment": "reset", "prop": "_recv_state", "value": 0}, + {"comment": "reset", "prop": "_recv_seq", "value": 0}, + {"comment": "ACKed by peer N(R)", "delta": 1, "prop": "_recv_seq"}, + ] + + # We should send a RR in reply + assert count == dict(send_rr=1, send_rnr=0, send_next_iframe=0) + + +def test_recv_sframe_rej_rep(): + """ + Test that REJ with P/F clear marks peer busy + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub the functions called + count = dict(send_rr=0, send_rnr=0, send_next_iframe=0) + state_updates = [] + + def _send_rr(): + count["send_rr"] += 1 + + peer._send_rr_notification = _send_rr + + def _send_rnr(): + count["send_rnr"] += 1 + + peer._send_rnr_notification = _send_rnr + + def _send_next_iframe(): + count["send_next_iframe"] += 1 + + peer._send_next_iframe = _send_next_iframe + + def _update_state(prop, **kwargs): + kwargs["prop"] = prop + state_updates.append(kwargs) + setattr( + peer, + prop, + kwargs.get("value", getattr(peer, prop)) + kwargs.get("delta", 0), + ) + + peer._update_state = _update_state + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._init_connection(False) + peer._peer_busy = False + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x49", + ) + ) + + assert state_updates == [ + # Reset state + {"comment": "reset", "prop": "_send_state", "value": 0}, + {"comment": "reset", "prop": "_send_seq", "value": 0}, + {"comment": "reset", "prop": "_recv_state", "value": 0}, + {"comment": "reset", "prop": "_recv_seq", "value": 0}, + # Peer ACK + {"comment": "ACKed by peer N(R)", "delta": 1, "prop": "_recv_seq"}, + # REJ handling + {"comment": "from REJ N(R)", "prop": "_send_state", "value": 2}, + ] + + # We should send an I-frame in reply + assert count == dict(send_rr=0, send_rnr=0, send_next_iframe=1) + + +def test_recv_sframe_srej_pf(): + """ + Test that REJ with P/F set retransmits specified frame + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub the functions called + iframes_rqd = [] + + def _transmit_iframe(nr): + iframes_rqd.append(nr) + + peer._transmit_iframe = _transmit_iframe + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._init_connection(True) + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x0d\x55", + ) + ) + + assert iframes_rqd == [42] + + +def test_recv_sframe_srej_nopf(): + """ + Test that REJ with P/F clear retransmits specified frame + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub the functions called + iframes_rqd = [] + + def _transmit_iframe(nr): + iframes_rqd.append(nr) + + peer._transmit_iframe = _transmit_iframe + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._init_connection(True) + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x0d\x54", + ) + ) + + assert iframes_rqd == [42] + + def test_recv_disc(): """ Test that DISC is handled.