-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
405 lines (336 loc) · 18.9 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
"""computer pointer controller"""
"""
Copyright [2020] [MEHUL SOLANKI]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import sys
import time
import cv2
import numpy as np
import logging as log
import datetime
from argparse import ArgumentParser
from src.input_feeder import InputFeeder
from src.face_detection import face_detection
from src.head_pose_estimation import head_pose_estimation
from src.facial_landmarks_detection import facial_landmarks_detection
from src.gaze_estimation import gaze_estimation_model
from src.mouse_controller import MouseController
# var. result filtering for face detection with confidence value
filtered_result_face_detection = [[]]
# Initialize Log File, will save to current dir with datetime
filenameis = "log_" + datetime.datetime.now().strftime("%Y%m%d%H%M%S") + ".txt"
log.basicConfig(filename = filenameis ,level=log.DEBUG)
# log syntax samples
# log.debug('This message should go to the log file')
# log.info('So should this')
# log.warning('And this, too')
#Win10 CPU_EXTENSION Path Openvino V2019R3
CPU_EXTENSION = r"C:/Program Files (x86)/IntelSWTools/openvino_2019.3.379/deployment_tools/inference_engine/bin/intel64/Release/cpu_extension_avx2.dll"
log.info("<--- This is autogenerated log file for computer mouse controller application. --->")
log.info("Path of cpu_extension: " + str(CPU_EXTENSION))
def build_argparser():
"""
Parse command line arguments.
:return: command line arguments
"""
parser = ArgumentParser()
# Model related args.
parser.add_argument("-fd", "--fd", required=True, type=str,
help="Path to an xml file of face detection model.")
parser.add_argument("-pt_fd", "--prob_threshold_fd", type=float, default=0.5,
help="Probability threshold for face detections filtering"
"(0.5 by default)")
parser.add_argument("-hpe", "--hpe", required=True, type=str,
help="Path to an xml file of head pose estimation model.")
parser.add_argument("-pt_hpe", "--prob_threshold_hpe", type=float, default=0.5,
help="Probability threshold for head pose estimation filtering"
"(0.5 by default)")
parser.add_argument("-fld", "--fld", required=True, type=str,
help="Path to an xml file of facial_landmarks_detection model.")
parser.add_argument("-pt_fld", "--prob_threshold_fld", type=float, default=0.5,
help="Probability threshold for facial_landmarks_detection filtering"
"(0.5 by default)")
parser.add_argument("-ge", "--ge", required=True, type=str,
help="Path to an xml file of facial_landmarks_detection model.")
parser.add_argument("-pt_ge", "--prob_threshold_ge", type=float, default=0.5,
help="Probability threshold for facial_landmarks_detection filtering"
"(0.5 by default)")
# other
parser.add_argument("-i", "--input", required=True, type=str,
help="Path to image, video file or for webcam just type CAM")
parser.add_argument("-fps", "--fps", required=False, type=int,
help="FPS of Video or webcam, required to get perfect duration calculations.")
parser.add_argument("-l", "--cpu_extension", required=False, type=str,
default=CPU_EXTENSION,
help="MKLDNN (CPU)-targeted custom layers."
"Absolute path to a shared library with the"
"kernels impl.")
parser.add_argument("-d", "--device", type=str, default="CPU",
help="Specify the target device to infer on: "
"CPU, GPU, FPGA or MYRIAD is acceptable. Sample "
"will look for a suitable plugin for device "
"specified (CPU by default)")
parser.add_argument("-tv", "--toggle_video", type=str, default="ON",
help="Toggle Video feed on or off [ON or OFF]"
"(on by default)")
parser.add_argument("-ci", "--cam_id", type=int, default=0,
help="input web Camera id"
"(0 by default)")
parser.add_argument("-sdo", "--show_debug_output", type=str, default="OFF",
help="Toggle Video feed on or off [ON or OFF]"
"(on by default)")
# Facility not implemented
# parser.add_argument("-wv", "--write_video", type=str, default="N",
# help="write video to local file Y or N [Y or N]"
# "(on by default)")
return parser
def check_input_type(input):
"""
check input type is video,image or cam.
"""
checkInputargs = input #string from args.input
checkError = checkInputargs.find(".") #Verify If there is extension or other than CAM
error_flag = False
image_flag = False
if checkInputargs == "CAM": # Check for cam
input_type = "cam"
print("Performing inference on webcam video...")
elif checkError is -1: # Check for if there any extension
print("Error: invalid input or currupted file") # Error for no extension
print("Use -h argument for help")
error_flag = True
else:
path,ext= checkInputargs.rsplit(".",1) #find extension
if ext == "bmp" or ext == "jpg": #supporeted ext.
print("Performing inference on single image...")
input_type = "image"
image_flag = True
elif ext == "mp4" or ext == "MP4": #if not image feed video
input_type = "video" #Load local stream
print("Performing inference on local video...")
else:
print("Image/Video formate not supported")
error_flag = True
return input_type, error_flag, image_flag
def process_output_face_detection(input_frames_raw, result, input_frames_raw_width, input_frames_raw_height, prob_threshold):
'''
Process results and Draw bounding boxes onto the frame.
output [xmin, ymin, xmax, ymax]
'''
for i, box in enumerate(result[0][0]): # Output shape is 1x1x100x7
conf = box[2]
if conf >= prob_threshold:
xmin = int(box[3] * input_frames_raw_width)
ymin = int(box[4] * input_frames_raw_height)
xmax = int(box[5] * input_frames_raw_width)
ymax = int(box[6] * input_frames_raw_height)
filtered_result_face_detection[i] = [xmin, ymin, xmax, ymax] # generate vector
# label = "Person"+str(countmultipeople)
#Adding 30px to offset rectagle from ROI
cv2.rectangle(input_frames_raw, (xmin-30, ymin-30), (xmax+30, ymax+30), (0,0,255), 1) #main rect.
# cv2.rectangle(input_frames_raw, (xmin, ymin), (xmin+90, ymin+10), (0,0,255), -1) # Text rect.
# cv2.putText(input_frames_raw, label, (xmin,ymin+10),cv2.FONT_HERSHEY_PLAIN, 0.8, (0,0,255), 1)
#print (filtered_result_face_detection) # for debug
return input_frames_raw, filtered_result_face_detection
def infer(args):
'''
This function processes each model, run inference and controls the curser.
'''
model_load_time = []
inference_time_fd = []
inference_time_hpe = []
inference_time_fld = []
inference_time_ge = []
# Initial setup for face detection model
model_load_start_time_fd = (time.time() * 1000) # Timer for START
detect_face = face_detection(args.fd, args.device, args.cpu_extension)
model_load_end_time_fd = (time.time() * 1000) - model_load_start_time_fd # Timer for END
model_load_time.append(model_load_end_time_fd)
# Initial setup for head_pose_estimation model
model_load_start_time_hpe = (time.time() * 1000) # Timer for START
head_pose_angles = head_pose_estimation(args.hpe, args.device, args.cpu_extension)
model_load_end_time_hpe = (time.time() * 1000) - model_load_start_time_hpe # Timer for END
model_load_time.append(model_load_end_time_hpe)
# Initial setup for facial landmars detection model
model_load_start_time_fld = (time.time() * 1000) # Timer for START
facial_landmarks = facial_landmarks_detection(args.fld, args.device, args.cpu_extension)
model_load_end_time_fld = (time.time() * 1000) - model_load_start_time_fld # Timer for END
model_load_time.append(model_load_end_time_fld)
# Initial setup for gaze_estimation
model_load_start_time_ge = (time.time() * 1000) # Timer for START
gaze_estimation = gaze_estimation_model(args.ge, args.device, args.cpu_extension)
model_load_end_time_ge = (time.time() * 1000) - model_load_start_time_ge # Timer for END
model_load_time.append(model_load_end_time_ge)
# Initial setup Moouse controller
mouse_auto = MouseController('high', 'fast')
log.info("Mouse controller initialized")
# Open inputs
input_type, error_flag, image_flag = check_input_type(args.input)
if not error_flag:
input_feeder = InputFeeder(input_type, args.input, args.cam_id)
else:
log.info("Bad inputs, check for input video, image path or cam id")
print("program stopped")
exit()
input_frame_raw_width, input_frame_raw_height = input_feeder.load_data() # start opencv cap and initialize frame
if input_frame_raw_width < 90 or input_frame_raw_width is None: # If input path is wrong
log.info("Error! Can't read Input: Check path, or image is too small to be infered")
print("program stopped")
exit()
# Run inference
frame_count = 0
for input_frames_raw in input_feeder.next_batch():
frame_count += 1
if input_frames_raw is None:
log.info("Input is currupted in run time or batch finished, check for the issue")
log.info("Last frame processed sucessfully no.: " + str(frame_count))
print("Program stopped")
break
key_pressed = cv2.waitKey(1)
if key_pressed == 27:
log.info("program manually terminated")
print("program manually terminated")
break
# get face detection results
inference_time_start_fd = (time.time() * 1000) # Timer for START
result_face_detection = detect_face.predict(input_frames_raw, input_frame_raw_width, input_frame_raw_height) #HxW
inference_time_end_fd = (time.time() * 1000) - inference_time_start_fd # Timer for END
inference_time_fd.append(inference_time_end_fd)
# get filtered result with prob threshold and draw on the raw frame
face_frame, filtered_result_face_detection = process_output_face_detection(input_frames_raw, result_face_detection, input_frame_raw_width, input_frame_raw_height, args.prob_threshold_fd)
# get face ROI and added 20px margin to crop slightly big roi [y1:y2, x1:x2]
face_roi = face_frame[filtered_result_face_detection[0][1] - 20 :filtered_result_face_detection[0][3] + 20,
filtered_result_face_detection[0][0] - 20:filtered_result_face_detection[0][2] + 20]
# get results of head pose estimation angles
inference_time_start_hpe = (time.time() * 1000) # Timer for START
result_head_pose_estimation = head_pose_angles.predict(face_roi, face_roi.shape[1],face_roi.shape[0])
inference_time_end_hpe = (time.time() * 1000) - inference_time_start_hpe # Timer for END
inference_time_hpe.append(inference_time_end_hpe)
# Extract information from respective blobs
yaw = result_head_pose_estimation['angle_y_fc'][0][0]
pitch = result_head_pose_estimation['angle_p_fc'][0][0]
roll = result_head_pose_estimation['angle_r_fc'][0][0]
# print("output of head pose estimation angles are yaw, pitch, roll in degree: ",yaw, pitch, roll)
# generate [1x3] vector for gaze estimation model
vector_yaw_pitch_roll = [yaw, pitch, roll]
# get result of facial landmark detection model
# input is face roi only
inference_time_start_fld = (time.time() * 1000) # Timer for START
result_facial_landmarks = facial_landmarks.predict(face_roi, face_roi.shape[1],face_roi.shape[0]) # HxW
inference_time_end_fld = (time.time() * 1000) - inference_time_start_fld # Timer for END
inference_time_fld.append(inference_time_end_fld)
# print(result_facial_landmarks.shape) # get shape
result_facial_landmarks = result_facial_landmarks[::,::,0,0] # slicing last two dim to 1x10
# print(result_facial_landmarks) # print vector
# print(result_facial_landmarks.shape) # get new shape
# debug
# print("eye coords raw: ",result_facial_landmarks[0][0], result_facial_landmarks[0][1],result_facial_landmarks[0][2],result_facial_landmarks[0][3])
# draw left eye and right eye
left_eye_point_x = int(result_facial_landmarks[0][0] * face_roi.shape[1])
left_eye_point_y = int(result_facial_landmarks[0][1] * face_roi.shape[0])
right_eye_point_x = int(result_facial_landmarks[0][2] * face_roi.shape[1])
right_eye_point_y = int(result_facial_landmarks[0][3] * face_roi.shape[0])
# debug
# print("eye coords for roi px: ",left_eye_point_x, left_eye_point_y, right_eye_point_x, right_eye_point_y)
# doc for eyes coordinates
# xmin = left_eye_point_x
# ymin = left_eye_point_y
# xmax = right_eye_point_x
# ymax = right_eye_point_y
# Left and right eyes ROI for gaze estimation model
left_eye_roi = face_roi[left_eye_point_y-25:left_eye_point_y+25, left_eye_point_x-25:left_eye_point_x+25]
right_eye_roi = face_roi[right_eye_point_y-25:right_eye_point_y+25, right_eye_point_x-25:right_eye_point_x+25 ]
# get the resul from gaze estimation model
inference_time_start_ge = (time.time() * 1000) # Timer for START
result_gaze_estimation = gaze_estimation.predict(left_eye_roi, right_eye_roi, vector_yaw_pitch_roll)
inference_time_end_ge = (time.time() * 1000) - inference_time_start_ge # Timer for END
inference_time_ge.append(inference_time_end_ge)
# print("gaze estimation result shape: ",result_gaze_estimation.shape) # [1x3]
# print("gaze estimation values",result_gaze_estimation)
# control the mouse
# x = int(result_gaze_estimation[0][0] * left_eye_roi.shape[0])
# y = int(result_gaze_estimation [0][1] * left_eye_roi.shape[0])
# print("mouse value x,y: ", x,y)
# mouse_auto.move(0, 0)
mouse_auto.move(result_gaze_estimation[0][0], result_gaze_estimation[0][1])
# draw circle to eye points for model output visualization
cv2.circle(face_roi, (left_eye_point_x,left_eye_point_y), 10, (0,255,255), 1)
cv2.circle(face_roi, (right_eye_point_x,right_eye_point_y), 10, (0,255,255), 1)
# draw rectangle to get roi and visualization of eyes area 30px offset for slightly big rect.
# print("eye coords for roi: ", left_eye_point_x, left_eye_point_y, right_eye_point_x, right_eye_point_y)
cv2.rectangle(face_roi, (left_eye_point_x-30, left_eye_point_y-30), (left_eye_point_x+30, left_eye_point_y+30), (0,255,255), 1) #main rect.
cv2.rectangle(face_roi, (right_eye_point_x-30, right_eye_point_y-30), (right_eye_point_x+30, right_eye_point_y+30), (0,255,255), 1) #main rect.
# Write video or image file
if not image_flag and args.show_debug_output == 'ON':
# Visualization
cv2.namedWindow('input_feed', cv2.WINDOW_NORMAL)
cv2.imshow('input_feed',input_frames_raw)
cv2.namedWindow('lefteye', cv2.WINDOW_NORMAL)
cv2.imshow('lefteye',left_eye_roi)
cv2.namedWindow('righteye', cv2.WINDOW_NORMAL)
cv2.imshow('righteye',right_eye_roi)
cv2.namedWindow('face_roi', cv2.WINDOW_NORMAL)
cv2.imshow('face_roi',face_roi)
else:
# Write an output image if single_image_mode
cv2.imwrite('output_image.jpg', face_frame)
print("Image saved sucessfully!")
# Print stats
print("----- Bechmark results -----")
print("Model name: [fd, hpe, fld, ge]")
print("Model load time in ms: ",model_load_time)
print("[Min Max Avg.]")
log_inference_time_fd = np.array(inference_time_fd)
print("Inference time log for model face detection in ms:")
print([log_inference_time_fd.min(),log_inference_time_fd.max(),(float("{:.2f}".format(np.average(log_inference_time_fd))))])
log_inference_time_hpe = np.array(inference_time_hpe)
print("Inference time log for model head pose estimation in ms:")
print([log_inference_time_hpe.min(),log_inference_time_hpe.max(),(float("{:.2f}".format(np.average(log_inference_time_hpe))))])
log_inference_time_fld = np.array(inference_time_fld)
print("Inference time log for model facial landmark detection in ms:")
print([log_inference_time_fld.min(),log_inference_time_fld.max(),(float("{:.2f}".format(np.average(log_inference_time_fld))))])
log_inference_time_ge = np.array(inference_time_ge)
print("Inference time log for model gaze estimation in ms:")
print([log_inference_time_ge.min(),log_inference_time_ge.max(),(float("{:.2f}".format(np.average(log_inference_time_ge))))])
cv2.destroyAllWindows()
return
def main():
"""
Run the inferences with all four models
"""
# Grab command line args
# This is different method so do not use .m type attributes instead use whole name.
args = build_argparser().parse_args()
print("Commandline Arguments received")
print("-----Information-----")
print("Model path_fd:",args.fd)
print("Confidence_fd:",args.prob_threshold_fd)
print("Model path_hpe:",args.hpe)
print("Confidence_hpe:",args.prob_threshold_hpe)
print("Model path_fld:",args.fld)
print("Confidence_fld:",args.prob_threshold_fld)
print("Model path_ge:",args.ge)
print("Confidence_ge:",args.prob_threshold_ge)
print("Video/Image path:",args.input)
print("Video fps:",args.fps)
print("Device:",args.device)
print("CPU Ext. path:",args.cpu_extension)
print("Web cam ID(If any):",args.cam_id)
print("Toggle video feed on/off:",args.toggle_video)
print("Show debug output screen:", args.show_debug_output)
# print("Write output to video file Y or N:",args.write_video)
print("-----------------------")
# Perform inference on the input stream
infer(args)
if __name__ == '__main__':
main()