forked from RoboLiga/ev3-nabiralec
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection.py
64 lines (56 loc) · 1.83 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#!/home/robot/roboliga/bin/python
# -*- coding: utf-8 -*-
import pycurl
import ujson
from io import BytesIO
from time import time
def robot_die(): ...
class Connection():
"""
Objekt za vzpostavljanje povezave s strežnikom.
"""
def __init__(self, url: str) -> None:
"""
Inicializacija nove povezave.
Argumenti:
url: pot do datoteke na strežniku (URL)
"""
self._url = url
self._buffer = BytesIO()
self._pycurlObj = pycurl.Curl()
self._pycurlObj.setopt(self._pycurlObj.URL, self._url)
self._pycurlObj.setopt(self._pycurlObj.CONNECTTIMEOUT, 10)
self._pycurlObj.setopt(self._pycurlObj.WRITEDATA, self._buffer)
def request(self, debug=False):
"""
Nalaganje podatkov s strežnika.
"""
# Počistimo pomnilnik za shranjevanje sporočila
self._buffer.seek(0, 0)
self._buffer.truncate()
# Pošljemo zahtevek na strežnik
self._pycurlObj.perform()
# Dekodiramo sporočilo
msg = self._buffer.getvalue().decode()
# Izluščimo podatke iz JSON
try:
return ujson.loads(msg)
except ValueError as err:
if debug:
print('Napaka pri razclenjevanju datoteke JSON: ' + str(err))
print('Sporocilo streznika:')
print(msg)
return -1
def test_delay(self, num_iters: int = 10) -> float:
"""
Merjenje zakasnitve pri pridobivanju podatkov o tekmi s strežnika.
Zgolj informativno.
"""
sum_time = 0
for _ in range(num_iters):
start_time = time()
if self.request(True) == -1:
robot_die()
elapsed_time = time() - start_time
sum_time += elapsed_time
return sum_time / num_iters