-
Notifications
You must be signed in to change notification settings - Fork 237
/
Copy pathintegration_test.py
executable file
·154 lines (132 loc) · 4.43 KB
/
integration_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python3
import os
import signal
import subprocess
import time
import itertools
from threading import Timer
import atexit
import logging
import argparse
from argparse import ArgumentParser
def parse_args():
epilog_text = '''
Integration test for DepthAI.
Generates all combinations of streams defined in "streams", runs each of them for maximum of "timeout" seconds.
The logs are written into integration_test.log.
Example usage: python3 integration_test.py -usb=2 -to=30
python3 integration_test.py -usb=3 -to=60
'''
parser = ArgumentParser(epilog=epilog_text,formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("-usb", "--usb_version", default=3,
type=int, required=False,
help="USB version on which to perform tests.")
parser.add_argument("-to", "--timeout_sec", default=30, type=int,
help="Timeout in seconds for each stream combination. [MAX time allowed to run each test.]")
options = parser.parse_args()
return options
global args
try:
args = vars(parse_args())
except:
os._exit(2)
global USB_version
global timeout_sec
if args['usb_version']:
USB_version = args['usb_version']
print("USB_version: "+str(USB_version))
if args['timeout_sec']:
timeout_sec = args['timeout_sec']
print("timeout: "+str(timeout_sec) + " seconds")
logger = logging.getLogger('integration_test')
hdlr = logging.FileHandler('./integration_test.log', 'w')
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.INFO)
def kill_proc(proc, timeout):
timeout["value"] = True
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
def cleanup():
global run
run=False
if(p is not None):
print('Stopping subprocess with pid: ', str(p.pid))
os.killpg(os.getpgid(p.pid), signal.SIGTERM)
print('Stopped!')
#todo merge with streams
stream_size = {
"previewout": 300*300*3*30,
"metaout" : 5 * 1024,
"left" : 1280*720*1*30,
"right" : 1280*720*1*30,
"depth_sipp" : 1280*720*2*30,
}
streams = [
"previewout",
"metaout",
"left",
"right",
"depth_sipp"]
global gl_limit_fps
if USB_version==2:
gl_limit_fps=True
usb_bandwith=29*1024*1024
gl_max_fps=12.0
else:
gl_limit_fps=False
global p
global return_code
p=None
atexit.register(cleanup)
config_ovewrite_cmd = """-co '{"streams": ["""
for L in range(0, len(streams)+1):
for subset in itertools.combinations(streams, L):
config_builder=config_ovewrite_cmd
total_stream_throughput=0
for comb1 in subset:
total_stream_throughput+=stream_size[comb1]
# print("total_stream_throughput",total_stream_throughput, "usb_bandwith", usb_bandwith)
limit_fps=None
if gl_limit_fps:
if total_stream_throughput>usb_bandwith:
limit_fps=True
else:
limit_fps=False
else:
limit_fps=False
# print("limiting fps", limit_fps)
for comb in subset:
# print (subset.index(comb),comb, stream_size[comb])
separator=None
if subset.index(comb) is 0:
separator=''
else:
separator=','
if comb is not "metaout" and limit_fps:
#todo dynamic max fps
max_fps = gl_max_fps
comb2 = '{"name": "' +comb+'", "max_fps": ' +str(max_fps)+ '}'
config_builder = config_builder+separator+comb2
else:
separator+='"'
config_builder = config_builder+separator+comb+'"'
config_builder = config_builder + """]}'"""
if subset:
cmd = "python3 depthai.py " + config_builder
if(config_builder == """-co '{"streams": ["metaout"]}'"""):
continue
logger.info(cmd)
p = subprocess.Popen(cmd, shell=True, preexec_fn=os.setsid)
timeout = {"value": False}
timer = Timer(timeout_sec, kill_proc, [p, timeout])
timer.start()
p.wait()
timer.cancel()
return_code = p.returncode
p=None
if(timeout["value"]):
logger.info("returned succesfully")
else:
logger.info("returned with error code: " + str(return_code))
time.sleep(5)