-
Notifications
You must be signed in to change notification settings - Fork 0
/
condition.py
144 lines (114 loc) · 5.05 KB
/
condition.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from utils.TranslateObject import translate
from utils.VariableClass import VariableClass
import cv2
import time
import re
# Initialize the VariableClass object, which contains all the necessary environment variables.
var = VariableClass()
# Human-readable conditions e.g.
# - 5 persons detected
# - 3 cars detected
# - 2 trucks detected
def condition_met(results, text=""):
count_persons = 0
count_cars = 0
count_trucks = 0
# Loop over boxes and masks.
# If no masks are found, meaning the model used is not a segmentation model, the mask is set to None.
for box, mask in zip(results[0].boxes, results[0].masks or [None] * len(results[0].boxes)):
# Check if object are detected.
# If no object is detected, the box.id will be None.
# In this case, the inner-loop is broken. Not calling the object related functions.
if box.id is None:
break
object_name = translate(results[0].names[int(box.cls)])
if object_name == 'car':
count_cars += 1
elif object_name == 'pedestrian':
count_persons += 1
elif object_name == 'truck':
count_trucks += 1
print(
f"Persons: {count_persons}, Cars: {count_cars}, Trucks: {count_trucks}")
counts = {
"persons": count_persons,
"cars": count_cars,
"trucks": count_trucks
}
match = re.match(r"(\d+) (\w+) detected", text)
if match:
number = int(match.group(1))
object_type = match.group(2)
if object_type in counts:
return counts[object_type] >= number
return False
# Function to process the frame.
def processFrame(MODEL, frame, video_out, condition):
# Perform object classification on the frame.
# persist=True -> The tracking results are stored in the model.
# persist should be kept True, as this provides unique IDs for each detection.
# More information about the tracking results via https://docs.ultralytics.com/reference/engine/results/
total_time_class_prediction = 0
if var.TIME_VERBOSE:
start_time_class_prediction = time.time()
# Execute the model
results = MODEL.track(
source=frame,
persist=True,
verbose=False,
conf=var.CLASSIFICATION_THRESHOLD)
if var.TIME_VERBOSE:
total_time_class_prediction += time.time() - start_time_class_prediction
# ###############################################
# This is where the custom logic comes into play
# ###############################################
# Check if the results are not None,
# Otherwise, the postprocessing should not be done.
# Iterate over the detected objects and their masks.
annotated_frame = frame.copy()
if results is not None:
# We can set a condition...
# In this case, we are looking for at least 5 persons.
# If the condition is met, the video is being forwarded to a remote vault.
is_condition_met = condition_met(
results, condition)
if is_condition_met:
print("Condition met, forwarding video to remote vault")
return frame, total_time_class_prediction, True
else:
print("Condition not met, not forwarding video to remote vault")
# Annotate the frame with the classification objects.
# Draw the class name and the confidence on the frame.
if var.SAVE_VIDEO or var.PLOT:
for box, mask in zip(results[0].boxes, results[0].masks or [None] * len(results[0].boxes)):
# Translate the class name to a human-readable format and display it on the frame.
object_name = translate(results[0].names[int(box.cls)])
cv2.putText(
img=annotated_frame,
text=object_name,
org=(int(box.xyxy.tolist()[0][0]), int(
box.xyxy.tolist()[0][1]) - 10),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=0.5,
color=(0, 255, 0),
thickness=2)
# Draw the bounding box on the frame.
cv2.rectangle(
img=annotated_frame,
pt1=(int(box.xyxy.tolist()[0][0]), int(
box.xyxy.tolist()[0][1])),
pt2=(int(box.xyxy.tolist()[0][2]), int(
box.xyxy.tolist()[0][3])),
color=(0, 255, 0),
thickness=2)
# Depending on the SAVE_VIDEO or PLOT parameter, the frame is annotated.
# This is done using a custom annotation function.
if var.SAVE_VIDEO or var.PLOT:
# Show the annotated frame if the PLOT parameter is set to True.
cv2.imshow("YOLOv8 Tracking",
annotated_frame) if var.PLOT else None
cv2.waitKey(1) if var.PLOT else None
# Write the annotated frame to the video-writer if the SAVE_VIDEO parameter is set to True.
video_out.write(
annotated_frame) if var.SAVE_VIDEO else None
return frame, total_time_class_prediction, False