-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhost.py
41 lines (36 loc) · 1.29 KB
/
host.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from collections import deque
from datetime import datetime, timedelta
class THost:
def __init__(self, name):
self.name = name
self.errors = {}
self.latest_data = deque(maxlen = 10)
def put(self, data_dict):
find = False
ts = data_dict['ts']
for timestamp in self.errors.keys():
if (ts - timestamp).seconds < 60:
self.errors[timestamp] += 1
find = True
if timestamp < datetime.now() - timedelta(hours=48):
del self.errors[timestamp]
if find == False:
self.errors.update({data_dict['ts']: 1})
self.latest_data.append(data_dict)
def remove(self, data_dict):
ts = data_dict['ts']
items_to_remove = []
for item in self.latest_data:
if item['id'] == data_dict['id']:
items_to_remove.append(item)
for timestamp in self.errors.keys():
if (ts - timestamp).seconds < 60:
self.errors[timestamp] -= 1
for item in items_to_remove:
self.latest_data.remove(item)
def get_count(self, td):
count = 0
for ts in self.errors.keys():
if ts > td:
count += self.errors[ts]
return count