Skip to content

Commit

Permalink
fix: refactor teamwork client and fix invalid code for teamwork
Browse files Browse the repository at this point in the history
  • Loading branch information
daxartio committed Nov 25, 2023
1 parent 12ea8fb commit aee1e99
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 105 deletions.
2 changes: 1 addition & 1 deletion sportorg/gui/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def teamwork(self, command):
# logging.info(repr(command.data))
# if 'object' in command.data and command.data['object'] in
# ['ResultManual', 'ResultSportident', 'ResultSFR', 'ResultSportiduino']:
if command.header.objType in [
if command.header.obj_type in [
ObjectTypes.Result.value,
ObjectTypes.ResultManual.value,
ObjectTypes.ResultSportident.value,
Expand Down
1 change: 0 additions & 1 deletion sportorg/models/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from abc import abstractmethod
from datetime import date
from enum import Enum, IntEnum
from typing import Dict
from typing import Optional

import dateutil.parser
Expand Down
167 changes: 71 additions & 96 deletions sportorg/modules/teamwork/client.py
Original file line number Diff line number Diff line change
@@ -1,106 +1,77 @@
import queue
import select
import socket
from threading import Event, Thread, main_thread
from threading import Event, Thread
from typing import Tuple

import orjson

from .packet_header import Header, Operations
from .server import Command


class ClientSenderThread(Thread):
def __init__(self, conn, in_queue, stop_event, logger):
super().__init__(daemon=True)
self.setName(self.__class__.__name__)
self.conn = conn
class ClientSender:
def __init__(self, in_queue: queue.Queue):
self._in_queue = in_queue
self._stop_event = stop_event
self._logger = logger

def run(self):
self._logger.debug('Client sender start')
while True:
try:
cmd = self._in_queue.get(timeout=5)
self.conn.sendall(cmd.get_packet())
except queue.Empty:
if not main_thread().is_alive() or self._stop_event.is_set():
break
except ConnectionResetError as e:
self._logger.debug(str(e))
break
except Exception as e:
self._logger.debug(str(e))
break
self.conn.close()
self._logger.debug('Client sender shutdown')
self._stop_event.set()

def send(self, conn: socket.socket) -> None:
try:
while True:
cmd = self._in_queue.get_nowait()
conn.sendall(cmd.get_packet())
except queue.Empty:
return

class ClientReceiverThread(Thread):
def __init__(self, conn, out_queue, stop_event, logger):
super().__init__()
self.setName(self.__class__.__name__)
self.conn = conn
self._out_queue = out_queue
self._stop_event = stop_event
self._logger = logger

def run(self):
full_data = b''
self.conn.settimeout(5)
self._logger.debug('Client receiver start')
hdr = Header()
is_new_pack = True
class ClientReceiver:
MSG_SIZE = 1024

def __init__(self, out_queue: queue.Queue):
self._out_queue = out_queue
self._full_data = b''
self._hdr = Header()
self._is_new_pack = True

def read(self, conn: socket.socket) -> None:
data = conn.recv(self.MSG_SIZE)
if not data:
return
self._full_data += data
while True:
try:
data = self.conn.recv(1024)
if not data:
# getting Header
if self._is_new_pack:
if len(self._full_data) >= self._hdr.header_size:
self._hdr.unpack_header(self._full_data[: self._hdr.header_size])
self._full_data = self._full_data[self._hdr.header_size :]
self._is_new_pack = False
else:
break
full_data += data
while True:
# getting Header
if is_new_pack:
if len(full_data) >= hdr.header_size:
hdr.unpack_header(full_data[: hdr.header_size])
full_data = full_data[hdr.header_size :]
is_new_pack = False
else:
break
# Getting JSON data
else:
if len(full_data) >= hdr.size:
command = Command(
orjson.loads(full_data[: hdr.size].decode()),
Operations(hdr.op_type).name,
)
self._out_queue.put(command) # for local
full_data = full_data[hdr.size :]
is_new_pack = True
else:
break
except socket.timeout:
if not main_thread().is_alive() or self._stop_event.is_set():
# Getting JSON data
else:
if len(self._full_data) >= self._hdr.size:
command = Command(
orjson.loads(self._full_data[: self._hdr.size].decode()),
Operations(self._hdr.op_type).name,
)
self._out_queue.put_nowait(command)
self._full_data = self._full_data[self._hdr.size :]
self._is_new_pack = True
else:
break
except ConnectionAbortedError as e:
self._logger.exception(e)
break
except ConnectionResetError as e:
self._logger.exception(e)
break
except Exception as e:
self._logger.exception(e)
break
self._logger.debug('Client receiver shutdown')
self._stop_event.set()


class ClientThread(Thread):
def __init__(self, addr, in_queue, out_queue, stop_event, logger):
def __init__(
self,
addr: Tuple[str, int],
in_queue: queue.Queue,
out_queue: queue.Queue,
stop_event: Event,
logger,
):
super().__init__()
self.setName(self.__class__.__name__)
self.addr = addr
self.setName('Teamwork Client')
self._addr = addr
self._in_queue = in_queue
self._out_queue = out_queue
self._stop_event = stop_event
Expand All @@ -113,27 +84,31 @@ def join_client(self) -> None:
def run(self) -> None:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.connect(self.addr)
self._logger.info('Client start')
sender = ClientSenderThread(
s, self._in_queue, self._stop_event, self._logger
)
sender.start()
receiver = ClientReceiverThread(
s, self._out_queue, self._stop_event, self._logger
)
receiver.start()
s.connect(self._addr)
s.settimeout(5)
s.setblocking(False)
self._logger.info('Client started')
self._client_started.set()

sender.join()
receiver.join()
sender = ClientSender(self._in_queue)
receiver = ClientReceiver(self._out_queue)
sockets = [s]
while True:
if self._stop_event.is_set():
break
rread, rwrite, err = select.select(sockets, sockets, [], 1)
if rread:
receiver.read(s)
if rwrite:
sender.send(s)

except ConnectionRefusedError as e:
self._logger.exception(e)
self._stop_event.set()
return
except Exception as e:
self._logger.exception(e)
self._stop_event.set()
return

s.close()
self._logger.info('Client shutdown')
self._logger.info('Client stopped')
13 changes: 7 additions & 6 deletions sportorg/modules/teamwork/server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
import socket
from queue import Empty, Queue
from threading import Event, Thread, main_thread

import orjson

from .packet_header import Header, ObjectTypes, Operations


Expand All @@ -22,7 +23,7 @@ def exclude(self, addr):
return self

def get_packet(self):
pack_data = json.dumps(self.data).encode()
pack_data = orjson.dumps(self.data)
return self.header.pack_header(len(pack_data)) + pack_data


Expand Down Expand Up @@ -50,7 +51,7 @@ def __init__(self, conn, in_queue, out_queue, stop_event, logger):

def run(self):
with self.connect.conn:
self._logger.debug('Server receiver start')
self._logger.debug('Server receiver started')
self._logger.info('Connected by {}'.format(self.connect.addr))
full_data = b''
self.connect.conn.settimeout(5)
Expand All @@ -75,7 +76,7 @@ def run(self):
else:
if len(full_data) >= hdr.size:
command = Command(
json.loads(full_data[: hdr.size].decode()),
orjson.loads(full_data[: hdr.size].decode()),
Operations(hdr.op_type).name,
self.connect.addr,
)
Expand Down Expand Up @@ -166,7 +167,7 @@ def run(self) -> None:
s.listen(1)
s.settimeout(5)

self._logger.info('Server start')
self._logger.info('Server started')

conns_queue = Queue() # type: ignore
sender = ServerSenderThread(
Expand Down Expand Up @@ -199,4 +200,4 @@ def run(self) -> None:
sender.join()
for srt in connections:
srt.join()
self._logger.info('Server shutdown')
self._logger.info('Server stopped')
2 changes: 1 addition & 1 deletion tests/test_teamwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def test_teamwork():
'Create',
)
)
result = client_out_queue.get(timeout=5)
result = client_out_queue.get(timeout=10)
assert result.data == {
'object': 'Person',
'id': 'c24eef6c-a33b-4581-a6d1-78294711aef1',
Expand Down

0 comments on commit aee1e99

Please sign in to comment.