-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_speedtest.py
129 lines (105 loc) · 3.28 KB
/
run_speedtest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from sqlite3 import connect
import importlib
from pprint import pprint
from datetime import datetime
from config import COLUMN_TYPES
try:
import speedtest
except ImportError:
speedtest = importlib.import_module("speedtest-cli.speedtest")
def run_speedtest(
secure: bool = True,
servers: list = None,
threads: int = None,
) -> dict:
"""Run a speedtest and return the results.
Arguments:
----------
secure: bool
Whether to use a secure connection or not. Default is True.
servers: list
List of servers to test against. If None, a list of
servers will be chosen based on who's the best by latency.
Default is None.
threads: int
Number of threads to use. If None, the speedtest will not be limited
to a set number of threads. Default is None.
"""
if not isinstance(servers, list):
servers = []
s = speedtest.Speedtest(secure=secure)
s.get_servers(servers)
s.get_best_server()
s.download(threads=threads)
s.upload(threads=threads)
s.results.share()
return s.results.dict()
def create_table(db_name, table_name):
db = connect(db_name)
c = db.cursor()
c.execute(
f"""
CREATE TABLE IF NOT EXISTS {table_name} (
{
','.join(
[
k + ' ' + v
for k, v in COLUMN_TYPES.items()
]
)
}
)
"""
)
db.commit()
db.close()
def insert_into_db(db_name, table_name, data):
db = connect(db_name)
c = db.cursor()
print(f"inserting to table {table_name} the following data:")
pprint(data)
insert_query = f"""
INSERT INTO {table_name}
({','.join(data.keys())})
VALUES ({'?, ' * (len(COLUMN_TYPES) - 1)} ?)
"""
c.execute(
insert_query,
list(data.values()),
)
db.commit()
db.close()
def flatten_dict(d, parent_key="", sep="_"):
# GitHub Copilot was here
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def main(db_path: str):
print(f"{datetime.now()} - Starting speedtest...")
results = run_speedtest()
print("Fixing some data...")
# convert timestamp to seconds since unix time
results["timestamp"] = results["timestamp"].replace("Z", "+00:00")
results["timestamp"] = datetime.fromisoformat(results["timestamp"])
results["timestamp"] = int(results["timestamp"].timestamp())
results = flatten_dict(results)
print("Creating database...")
create_table(db_path, "speedtest")
print("Inserting data into database...")
insert_into_db(db_path, "speedtest", results)
if __name__ == "__main__":
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument(
"--db-path",
help="Path to your SQLite database file. Gets created if it does not exist.",
required=True,
)
args = parser.parse_args()
main(args.db_path)
print("SUCCESS! Tested and logged your internet speed! 🚀")