diff --git a/.env b/.env index af17485..8bea6c7 100644 --- a/.env +++ b/.env @@ -1,11 +1,18 @@ # Environment variables # Model parameters -MODEL_NAME = "yolov8n.pt" -CONDITION = "1 persons detected" # or "5 cars detected" +MODEL_NAME="yolov8n.pt" +MODEL_NAME_2="your_second_model.pt" +MODEL_ALLOWED_CLASSES="0" +MODEL_2_ALLOWED_CLASSES="0" + +#Dataset parameters +DATASET_FORMAT="base" +DATASET_VERSION="1" +DATASET_UPLOAD="True" # Forwarding -FORWARDING_MEDIA = "True" -REMOVE_AFTER_PROCESSED = "True" +FORWARDING_MEDIA="True" +REMOVE_AFTER_PROCESSED="True" # Queue parameters QUEUE_NAME="data-harvesting" @@ -20,22 +27,35 @@ STORAGE_URI="https://vault.xxx.xx/api" STORAGE_ACCESS_KEY="xxxx" STORAGE_SECRET_KEY="xxx" +#Integration parameters +INTEGRATION_NAME="roboflow" + # Roboflow parameters -RBF_API_KEY = "xxx" -RBF_WORKSPACE = "xxx" -RBF_PROJECT = "xxx" +RBF_API_KEY="xxx" +RBF_WORKSPACE="xxx" +RBF_PROJECT="xxx" + +#S3 parameters +S3_ENDPOINT="xxx" +S3_ACCESS_KEY="xxx" +S3_SECRET_KEY="xxx" +S3_BUCKET="xxx" # Feature parameters -PLOT = "True" -SAVE_VIDEO = "True" -MEDIA_SAVEPATH = "data/video/video_in.mp4" -OUTPUT_MEDIA_SAVEPATH = "data/video/video_out.mp4" +PLOT="True" +MEDIA_SAVEPATH="data/video/video_in.mp4" +SAVE_VIDEO="True" +OUTPUT_MEDIA_SAVEPATH="data/video/video_out.mp4" +REMOVE_AFTER_PROCESSED="False" -TIME_VERBOSE = "True" -LOGGING = "True" +TIME_VERBOSE="True" +LOGGING="True" # Classification parameters -CLASSIFICATION_FPS = "5" -CLASSIFICATION_THRESHOLD = "0.2" -MAX_NUMBER_OF_PREDICTIONS = "100" -ALLOWED_CLASSIFICATIONS = "0, 1, 2, 3, 4, 5, 6, 7, 8, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 26, 28" +CLASSIFICATION_FPS="5" +CLASSIFICATION_THRESHOLD="0.2" +MAX_NUMBER_OF_PREDICTIONS="100" +FRAMES_SKIP_AFTER_DETECT="50" +ALLOWED_CLASSIFICATIONS="0,1,2,3,4,5,6,7,8,14,15,16,17,18,19,20,21,22,23,24,26,28" +MIN_DETECTIONS="1" +IOU="0.85" diff --git a/Dockerfile b/Dockerfile index 28246a8..6a15d20 100644 --- a/Dockerfile +++ b/Dockerfile @@ -90,7 +90,9 @@ ENV MIN_DISTANCE "" ENV MIN_STATIC_DISTANCE "" ENV MIN_DETECTIONS "" ENV ALLOWED_CLASSIFICATIONS "0, 1, 2, 3, 5, 7, 14, 15, 16, 24, 26, 28" - +ENV IOU "" +ENV FRAMES_SKIP_AFTER_DETECT "" +ENV MIN_DETECTIONS "" # Run the application ENTRYPOINT ["python" , "queue_harvesting.py"] \ No newline at end of file diff --git a/condition.py b/condition.py index 1bd05d8..3f1df1b 100644 --- a/condition.py +++ b/condition.py @@ -1,4 +1,3 @@ - from utils.TranslateObject import translate from utils.VariableClass import VariableClass import cv2 @@ -7,10 +6,11 @@ # Initialize the VariableClass object, which contains all the necessary environment variables. var = VariableClass() + # Function to process the frame. -def processFrame(MODEL, MODEL2, frame, video_out='', frames_out=''): +def process_frame(MODEL, MODEL2, frame, condition_func, mapping, video_out='', frames_out=''): # Perform object classification on the frame. # persist=True -> The tracking results are stored in the model. # persist should be kept True, as this provides unique IDs for each detection. @@ -25,16 +25,18 @@ def processFrame(MODEL, MODEL2, frame, video_out='', frames_out=''): source=frame, persist=True, verbose=False, - iou=0.85, - conf=var.CLASSIFICATION_THRESHOLD) + iou=var.IOU, + conf=var.CLASSIFICATION_THRESHOLD, + classes=var.MODEL_ALLOWED_CLASSES) results2 = None if MODEL2: results2 = MODEL2.track( source=frame, persist=True, verbose=False, - iou=0.85, - conf=var.CLASSIFICATION_THRESHOLD) + iou=var.IOU, + conf=var.CLASSIFICATION_THRESHOLD, + classes=var.MODEL_2_ALLOWED_CLASSES) results2 = results2[0] if var.TIME_VERBOSE: @@ -46,32 +48,13 @@ def processFrame(MODEL, MODEL2, frame, video_out='', frames_out=''): # Check if the results are not None, #  Otherwise, the postprocessing should not be done. # Iterate over the detected objects and their masks. - results = results[0] # Pick the first element since it returned a list of Result not the object itself + results = results[0] # Pick the first element since it returned a list of Result not the object itself annotated_frame = frame.copy() # Empty frame containing labels with bounding boxes labels_and_boxes = '' - # if results is not None: - # # Using the results of the classification, we can verify if we have a condition met. - # # We can look for example for people who are: - # # - not wearing a helmet, - # # - people with a blue shirt, - # # - cars driving in the opposite direction, - # # - etc. - # # You are in the driving seat so you can write your custom code to detect the condition - # # you are looking for. - # if len(results.boxes) >= var.MIN_DETECTIONS: # If there are at least 5 boxes found (Could belong to either class) - # print("Condition met, we are gathering the labels and boxes and return results") - # # Extract label and boxes from result in YOLOv8 format - # for cls_item, xywhn_item in zip(results.boxes.cls.tolist(), results.boxes.xywhn): - # labels_and_boxes = labels_and_boxes + f'{int(cls_item)} {xywhn_item[0]} {xywhn_item[1]} {xywhn_item[2]} {xywhn_item[3]}\n' - # - # return frame, total_time_class_prediction, True, labels_and_boxes - # else: - # print("Condition not met, skipping frame") - if results is not None or results2 is not None: combined_results = [] @@ -80,28 +63,33 @@ def processFrame(MODEL, MODEL2, frame, video_out='', frames_out=''): # Valid image need to: # + Have at least MIN_DETECTIONS objects detected: # + Have to have helmet (since we are lacking of helmet dataset) - # + Number of helmet and person detected are equal (make sure every person wearing a helmet is detected) - if (len(results.boxes) > 0 - and len(results2.boxes) > 0 - and (any(box.cls == 1 for box in results2.boxes) - or any(box.cls == 2 for box in results.boxes)) - and sum(box.cls == 1 for box in results.boxes) == sum(box.cls == 2 for box in results.boxes)): - for box1, box2 in zip(results.boxes, results2.boxes): - if box1.cls == box2.cls: - avg_conf = (box1.conf + box2.conf) / 2 - if box1.conf >= box2.conf: - combined_results.append((box1.xywhn, box1.cls, avg_conf)) - else: - combined_results.append((box2.xywhn, box2.cls, avg_conf)) - - # Add any remaining boxes from model 1 or model 2 if their counts are different - combined_results += [(box.xywhn, box.cls, box.conf) for box in results.boxes[len(combined_results):]] - combined_results += [(box.xywhn, box.cls, box.conf) for box in results2.boxes[len(combined_results):]] - - if len(combined_results) >= var.MIN_DETECTIONS: # If the combined result has at least 5 boxes found (Could belong to either class) + if condition_func(results, results2, mapping): + # Add labels and boxes of model 1 (add using mapping since we will store the label of model 2) + combined_results += [(box.xywhn, mapping[int(box.cls)], box.conf) for box in results.boxes] + + # Add labels and boxes of model 2 + combined_results += [(box2.xywhn, box2.cls, box2.conf) for box2 in results2.boxes] + + # sort results based on descending confidences + sorted_combined_results = sorted(combined_results, key=lambda x: x[2], reverse=True) + + # Remove duplicates (if x and y coordinates of 2 boxes with the same class are < 0.01 + # -> consider as duplication and remove + combined_results = [] + for element in sorted_combined_results: + add_flag = True + for res in combined_results: + if res[1] == element[1]: + if (abs(res[0][0][0] - element[0][0][0]) < 0.01 + and (abs(res[0][0][1] - element[0][0][1]) < 0.01)): + add_flag = False + if add_flag: + combined_results.append(element) + + if len(combined_results) >= var.MIN_DETECTIONS: # If the combined result has at least MIN_DETECTIONS boxes found (Could belong to either class) print("Condition met, we are gathering the labels and boxes and return results") - for xywhn, cls, conf in combined_results: - labels_and_boxes += f'{int(cls[0])} {xywhn[0, 0].item()} {xywhn[0, 1].item()} {xywhn[0, 2].item()} {xywhn[0, 3].item()}\n' + for xywhn, cls, _ in combined_results: + labels_and_boxes += f'{int(cls)} {xywhn[0, 0].item()} {xywhn[0, 1].item()} {xywhn[0, 2].item()} {xywhn[0, 3].item()}\n' return frame, total_time_class_prediction, True, labels_and_boxes # Annotate the frame with the classification objects. @@ -132,15 +120,16 @@ def processFrame(MODEL, MODEL2, frame, video_out='', frames_out=''): # Depending on the SAVE_VIDEO or PLOT parameter, the frame is annotated. # This is done using a custom annotation function. - if var.SAVE_VIDEO or var.PLOT: - - # Show the annotated frame if the PLOT parameter is set to True. - cv2.imshow("YOLOv8 Tracking", - annotated_frame) if var.PLOT else None - cv2.waitKey(1) if var.PLOT else None - - # Write the annotated frame to the video-writer if the SAVE_VIDEO parameter is set to True. - video_out.write( - annotated_frame) if var.SAVE_VIDEO else None + # TODO: Fix this later (for some reasons code has error but vid is still saved) + # if var.SAVE_VIDEO or var.PLOT: + # + # # Show the annotated frame if the PLOT parameter is set to True. + # cv2.imshow("YOLOv8 Tracking", + # annotated_frame) if var.PLOT else None + # cv2.waitKey(1) if var.PLOT else None + # + # # Write the annotated frame to the video-writer if the SAVE_VIDEO parameter is set to True. + # video_out.write( + # annotated_frame) if var.SAVE_VIDEO else None return frame, total_time_class_prediction, False, labels_and_boxes diff --git a/exports/base_export.py b/exports/base_export.py new file mode 100644 index 0000000..6957583 --- /dev/null +++ b/exports/base_export.py @@ -0,0 +1,51 @@ +from exports.ibase_export import IBaseExport +from utils.VariableClass import VariableClass +from os.path import ( + join as pjoin, + dirname as pdirname, + abspath as pabspath, +) +import os +import time + + +class BaseExport(IBaseExport): + def __init__(self, proj_dir_name): + self._var = VariableClass() + _cur_dir = pdirname(pabspath(__file__)) + self.proj_dir = pjoin(_cur_dir, f'../data/{proj_dir_name}') + self.proj_dir = pabspath(self.proj_dir) # normalise the link + self.result_dir_path = None + + def initialize_save_dir(self): + """ + See ibase_project.py + + Returns: + None + """ + self.result_dir_path = pjoin(self.proj_dir, f'{self._var.DATASET_FORMAT}-v{self._var.DATASET_VERSION}') + os.makedirs(self.result_dir_path, exist_ok=True) + + if os.path.exists(self.result_dir_path): + print('Successfully initialize save directory!') + return True + else: + print('Something wrong happened!') + return False + + def save_frame(self, frame, predicted_frames, cv2, labels_and_boxes): + print(f'5.1. Condition met, processing valid frame: {predicted_frames}') + # Save original frame + unix_time = int(time.time()) + print("5.2. Saving frame, labels and boxes") + cv2.imwrite( + f'{self.result_dir_path}/{unix_time}.png', + frame) + # Save labels and boxes + with open(f'{self.result_dir_path}/{unix_time}.txt', + 'w') as my_file: + my_file.write(labels_and_boxes) + + # Increase the frame_number and predicted_frames by one. + return predicted_frames + 1 diff --git a/exports/export_factory.py b/exports/export_factory.py new file mode 100644 index 0000000..6d2ebf9 --- /dev/null +++ b/exports/export_factory.py @@ -0,0 +1,21 @@ +from exports.base_export import BaseExport +from exports.yolov8_export import Yolov8Export +from utils.VariableClass import VariableClass + + +class ExportFactory: + """ + Export Factory initializes specific export types. + """ + def __init__(self): + self._var = VariableClass() + self.save_format = self._var.DATASET_FORMAT + + def init(self, proj_name): + if self.save_format == 'yolov8': + return Yolov8Export(proj_name) + elif self.save_format == 'base': + return BaseExport(proj_name) + else: + raise ModuleNotFoundError('Export type not found!') + diff --git a/exports/ibase_export.py b/exports/ibase_export.py new file mode 100644 index 0000000..e208810 --- /dev/null +++ b/exports/ibase_export.py @@ -0,0 +1,12 @@ +from abc import ABC, abstractmethod + + +class IBaseExport(ABC): + + @abstractmethod + def initialize_save_dir(self): + pass + + @abstractmethod + def save_frame(self, frame, predicted_frames, cv2, labels_and_boxes): + pass diff --git a/exports/iyolov8_export.py b/exports/iyolov8_export.py new file mode 100644 index 0000000..977dca8 --- /dev/null +++ b/exports/iyolov8_export.py @@ -0,0 +1,16 @@ +from abc import ABC, abstractmethod + + +class IYolov8Export(ABC): + + @abstractmethod + def initialize_save_dir(self): + pass + + @abstractmethod + def save_frame(self, frame, predicted_frames, cv2, labels_and_boxes): + pass + + @abstractmethod + def create_yaml(self, model2): + pass diff --git a/exports/yolov8_export.py b/exports/yolov8_export.py new file mode 100644 index 0000000..01dcc0d --- /dev/null +++ b/exports/yolov8_export.py @@ -0,0 +1,83 @@ +from exports.iyolov8_export import IYolov8Export +from utils.VariableClass import VariableClass +from os.path import ( + join as pjoin, + dirname as pdirname, + abspath as pabspath, +) +import os +import time + + +class Yolov8Export(IYolov8Export): + def __init__(self, proj_dir_name): + """ + Constructor. + """ + self._var = VariableClass() + _cur_dir = pdirname(pabspath(__file__)) + self.proj_dir = pjoin(_cur_dir, f'../data/{proj_dir_name}') + self.proj_dir = pabspath(self.proj_dir) # normalise the link + self.image_dir_path = None + self.label_dir_path = None + self.yaml_path = None + self.result_dir_path = None + + def initialize_save_dir(self): + """ + See ibase_project.py + + Returns: + None + """ + self.result_dir_path = pjoin(self.proj_dir, f'{self._var.DATASET_FORMAT}-v{self._var.DATASET_VERSION}') + os.makedirs(self.result_dir_path, exist_ok=True) + + self.image_dir_path = pjoin(self.result_dir_path, 'images') + os.makedirs(self.image_dir_path, exist_ok=True) + + self.label_dir_path = pjoin(self.result_dir_path, 'labels') + os.makedirs(self.label_dir_path, exist_ok=True) + + self.yaml_path = pjoin(self.result_dir_path, 'data.yaml') + + if (os.path.exists(self.result_dir_path) + and os.path.exists(self.image_dir_path) + and os.path.exists(self.label_dir_path)): + print('Successfully initialize save directory!') + return True + else: + print('Something wrong happened!') + return False + + def save_frame(self, frame, predicted_frames, cv2, labels_and_boxes): + print(f'5.1. Condition met, processing valid frame: {predicted_frames}') + # Save original frame + unix_time = int(time.time()) + print("5.2. Saving frame, labels and boxes") + cv2.imwrite( + f'{self.image_dir_path}/{unix_time}.png', + frame) + # Save labels and boxes + with open(f'{self.label_dir_path}/{unix_time}.txt', + 'w') as my_file: + my_file.write(labels_and_boxes) + + # Increase the frame_number and predicted_frames by one. + return predicted_frames + 1 + + def create_yaml(self, model2): + """ + Create YAML configuration file with DATASET_FORMAT format. + As convention, class names of YAML file is configured based on model2 + + Returns: + None + """ + label_names = [name for name in list(model2.names.values())] + with open(self.yaml_path, 'w') as my_file: + content = 'names:\n' + for name in label_names: + content += f'- {name}\n' # class mapping for helmet detection project + content += f'nc: {len(label_names)}' + my_file.write(content) diff --git a/integrations/clearML_integration.py b/integrations/clearML_integration.py new file mode 100644 index 0000000..e69de29 diff --git a/integrations/integration_factory.py b/integrations/integration_factory.py new file mode 100644 index 0000000..4e3124b --- /dev/null +++ b/integrations/integration_factory.py @@ -0,0 +1,19 @@ +from integrations.roboflow_integration import RoboflowIntegration +from integrations.s3_integration import S3Integration +from utils.VariableClass import VariableClass + + +class IntegrationFactory: + def __init__(self): + self._var = VariableClass() + self.name = self._var.INTEGRATION_NAME + + def init(self): + if self.name == 'roboflow': + print('Initializing Roboflow agent ...') + return RoboflowIntegration() + elif self.name == 's3': + print('Initializing S3 compatible agent ...') + return S3Integration() + else: + raise ModuleNotFoundError('Integration type not found!') diff --git a/integrations/iroboflow_integration.py b/integrations/iroboflow_integration.py new file mode 100644 index 0000000..d58b2c7 --- /dev/null +++ b/integrations/iroboflow_integration.py @@ -0,0 +1,12 @@ +from abc import ABC, abstractmethod + + +class IRoboflowIntegration(ABC): + + @abstractmethod + def upload_dataset(self, src_project_path): + pass + + @abstractmethod + def __connect__(self): + pass diff --git a/integrations/is3_integration.py b/integrations/is3_integration.py new file mode 100644 index 0000000..2858a87 --- /dev/null +++ b/integrations/is3_integration.py @@ -0,0 +1,20 @@ +from abc import ABC, abstractmethod + + +class IS3Integration(ABC): + + @abstractmethod + def upload_file(self, source_path, output_path): + pass + + @abstractmethod + def upload_dataset(self, src_project_path): + pass + + @abstractmethod + def __connect__(self): + pass + + @abstractmethod + def __check_bucket_exists__(self, bucket_name): + pass diff --git a/connections/roboflow_helper.py b/integrations/roboflow_integration.py similarity index 58% rename from connections/roboflow_helper.py rename to integrations/roboflow_integration.py index d2753ae..f04c764 100644 --- a/connections/roboflow_helper.py +++ b/integrations/roboflow_integration.py @@ -6,22 +6,21 @@ from utils.VariableClass import VariableClass -var = VariableClass() -class RoboflowHelper: +class RoboflowIntegration: def __init__(self): - self.agent, self.ws, self.project = self.__login__ + self._var = VariableClass() + self.agent, self.ws, self.project = self.__connect__() - @property - def __login__(self): + def __connect__(self): try: # Attempt to initialize Roboflow with the API key - agent = roboflow.Roboflow(api_key=var.ROBOFLOW_API_KEY) + agent = roboflow.Roboflow(api_key=self._var.ROBOFLOW_API_KEY) # Access the workspace - workspace = agent.workspace(var.ROBOFLOW_WORKSPACE) + workspace = agent.workspace(self._var.ROBOFLOW_WORKSPACE) # Access the project - project = workspace.project(var.ROBOFLOW_PROJECT) + project = workspace.project(self._var.ROBOFLOW_PROJECT) return agent, workspace, project @@ -30,8 +29,8 @@ def __login__(self): raise ConnectionRefusedError(f'Error during Roboflow login: {e}') def upload_dataset(self, src_project_path): - # Upload data set to an existing project - self.ws.upload_dataset( + # Upload data set to an existing project + self.ws.upload_dataset( src_project_path, pbasename(self.project.id), num_workers=10, @@ -39,8 +38,8 @@ def upload_dataset(self, src_project_path): project_type="object-detection", batch_name=None, num_retries=0 - ) - print('Uploaded') + ) + print('Uploaded') - # Remove local folder when uploaded - shutil.rmtree(src_project_path) \ No newline at end of file + # Remove local folder when uploaded + shutil.rmtree(src_project_path) diff --git a/integrations/s3_integration.py b/integrations/s3_integration.py new file mode 100644 index 0000000..d341ecf --- /dev/null +++ b/integrations/s3_integration.py @@ -0,0 +1,70 @@ +import boto3 +import os + +from utils.VariableClass import VariableClass + + +class S3Integration: + def __init__(self): + self._var = VariableClass() + self.session, self.agent = self.__connect__() + self.bucket = self._var.S3_BUCKET + self.__check_bucket_exists__(self.bucket) + + def __connect__(self): + session = boto3.session.Session() + # Connect to Wasabi S3 + agent = session.client( + self._var.INTEGRATION_NAME, + endpoint_url=self._var.S3_ENDPOINT, # Wasabi endpoint URL + aws_access_key_id=self._var.S3_ACCESS_KEY, + aws_secret_access_key=self._var.S3_SECRET_KEY, + ) + print('Connected!') + + return session, agent + + def upload_file(self, source_path, output_path): + try: + self.agent.upload_file(source_path, self.bucket, output_path) + print(f"Successfully uploaded '{source_path}' to 's3://{self.bucket}/{output_path}'") + except Exception as e: + print(f"Failed to upload '{source_path}' to 's3://{self.bucket}/{output_path}': {e}") + + # def upload_dataset(self, src_project_path): + # # Iterate over all the files in the folder + # for root, dirs, files in os.walk(src_project_path): + # for filename in files: + # # Construct the full file path + # source_path = os.path.join(root, filename) + # + # output_path = f'{self._var.DATASET_FORMAT}-v{self._var.DATASET_VERSION}/{filename}' + # # Upload the file + # self.upload_file(source_path, output_path) + # print(f'Uploaded: {source_path} to s3://{self.bucket}/{output_path}') + + def upload_dataset(self, src_project_path): + # Iterate over all the files in the folder, including sub folders + for root, dirs, files in os.walk(src_project_path): + for filename in files: + # Construct the full file path + source_path = os.path.join(root, filename) + + # Preserve the folder structure in the S3 path + # Create the relative path from the source folder to the current file + relative_path = os.path.relpath(source_path, src_project_path) + + # Construct the output path using DATASET_FORMAT and DATASET_VERSION, including the relative path + output_path = f"{self._var.DATASET_FORMAT}-v{self._var.DATASET_VERSION}/{relative_path.replace(os.sep, '/')}" + + # Upload the file + self.upload_file(source_path, output_path) + print(f'Uploaded: {source_path} to s3://{self.bucket}/{output_path}') + + def __check_bucket_exists__(self, bucket_name): + try: + self.agent.head_bucket(Bucket=bucket_name) + print(f"Bucket '{bucket_name}' found.") + + except: + raise ModuleNotFoundError(f"Bucket '{bucket_name}' does not exist.") diff --git a/projects/base_project.py b/projects/base_project.py new file mode 100644 index 0000000..dbe113b --- /dev/null +++ b/projects/base_project.py @@ -0,0 +1,60 @@ +from os.path import ( + join as pjoin, + dirname as pdirname, + abspath as pabspath +) +from projects.ibase_project import IBaseProject +from utils.VariableClass import VariableClass + + +class BaseProject(IBaseProject): + """ + Base Project that implements common functions, every project should inherit this. + """ + + def __init__(self): + """ + Constructor. + """ + self._var = VariableClass() + self.proj_dir = None + + def condition_func(self, results1, results2, mapping): + """ + See ibase_project.py + """ + raise NotImplemented('Should override this!!!') + + def class_mapping(self, results1, results2): + """ + See ibase_project.py + """ + raise NotImplemented('Should override this!!!') + + def create_proj_save_dir(self, dir_name): + """ + See ibase_project.py + + Returns: + None + """ + _cur_dir = pdirname(pabspath(__file__)) + self.proj_dir = pjoin(_cur_dir, f'../data/{dir_name}') + self.proj_dir = pabspath(self.proj_dir) # normalise the link + print(f'1. Created/Found project folder under {self.proj_dir} path') + + # def create_result_save_dir(self): + # """ + # See ibase_project.py + # + # Returns: + # None + # """ + # if self._var.DATASET_FORMAT == 'yolov8': + # result_dir_path = pjoin(self.proj_dir, f'{datetime.now().strftime("%d-%m-%Y_%H-%M-%S")}') + # image_dir_path = pjoin(result_dir_path, 'images') + # label_dir_path = pjoin(result_dir_path, 'labels') + # yaml_path = pjoin(result_dir_path, 'data.yaml') + # return result_dir_path, image_dir_path, label_dir_path, yaml_path + # else: + # raise TypeError('Unsupported dataset format!') diff --git a/projects/helmet_project.py b/projects/helmet_project.py new file mode 100644 index 0000000..9a42f9e --- /dev/null +++ b/projects/helmet_project.py @@ -0,0 +1,60 @@ +from ultralytics import YOLO + +from projects.base_project import BaseProject +from projects.ihelmet_project import IHelmetProject + + +class HelmetProject(BaseProject, IHelmetProject): + """ + Helmet Project that implements functions for helmet-detection project. + """ + + def __init__(self): + """ + Constructor. + """ + super().__init__() + self.create_proj_save_dir('helmet_detection') + + def condition_func(self, results1, results2, mapping): + """ + See ihelmet_project.py + + Returns: + None + """ + person_model1 = 0 + person_model2 = mapping.get(person_model1) # Mapping person from model1 to model2 + + return ( + len(results1.boxes) > 0 + and len(results2.boxes) > 0 + and any(box.cls == person_model2 for box in results2.boxes) + and any(box.cls == 1 for box in results2.boxes) + ) + + def class_mapping(self, model1: YOLO, model2: YOLO): + """ + See ihelmet_project.py + + Returns: + mapping dictionary. + e.g: {0:2} where: + - 0 is the class of model1. + - 2 is the corresponding class of model2. + """ + model_1_classes = self._var.MODEL_ALLOWED_CLASSES + model_2_classes = self._var.MODEL_2_ALLOWED_CLASSES + mapping = {} + + # Loop through allowed classes in model1 and model2 + for index1 in model_1_classes: + class_name1 = model1.names.get(index1) # Get the name for the index from model1 + + for index2 in model_2_classes: + class_name2 = model2.names.get(index2) # Get the name for the index from model2 + + if class_name1 and class_name2 and str.lower(class_name1) == str.lower(class_name2): + mapping[index1] = index2 + + return mapping diff --git a/projects/ibase_project.py b/projects/ibase_project.py new file mode 100644 index 0000000..dc34ec3 --- /dev/null +++ b/projects/ibase_project.py @@ -0,0 +1,39 @@ +from abc import ABC, abstractmethod + + +class IBaseProject(ABC): + """ + Interface for Base Project. + """ + + @abstractmethod + def condition_func(self, results1, results2, mapping): + """ + Defines a condition function that operates logic on results of 2 models. + In Base Class it raises NotImplementedError, every project should override this function with custom logic. + + Args: + results1: The result of the 1st model. + results2: The result of the 2nd model. + mapping: A mapping that defines the relationships between elements + in results1 and results2. + Returns: + See base_project.py + """ + pass + + @abstractmethod + def class_mapping(self, model1, model2): + """ + Maps classes between two models using a provided mapping. + As a convention, the 2nd model class would be used as the final result. + In Base Class it raises NotImplementedError, every project should override this function with custom logic. + + Args: + model1: The 1st input model. + model2: The 2nd input model. + + Returns: + See base_project.py + """ + pass diff --git a/projects/ihelmet_project.py b/projects/ihelmet_project.py new file mode 100644 index 0000000..c600199 --- /dev/null +++ b/projects/ihelmet_project.py @@ -0,0 +1,41 @@ +from abc import ABC, abstractmethod + + +class IHelmetProject(ABC): + """ + Interface for Helmet Project. + """ + + @abstractmethod + def condition_func(self, results1, results2, mapping): + """ + Defines a condition function that operates logic on results of 2 models. + Conditions: + - results1 and results2 not empty + - Both contains person class + - results2 contains helmet + + Args: + results1: The result of the 1st model. + results2: The result of the 2nd model. + mapping: A mapping that defines the relationships between elements + in results1 and results2. + Returns: + See helmet_project.py + """ + pass + + @abstractmethod + def class_mapping(self, model1, model2): + """ + Maps classes between two models using a provided mapping. + As a convention, the 2nd model class would be used as the final result. + + Args: + model1: The 1st input model. + model2: The 2nd input model. + + Returns: + See helmet_project.py + """ + pass diff --git a/projects/project_factory.py b/projects/project_factory.py new file mode 100644 index 0000000..47b8aaa --- /dev/null +++ b/projects/project_factory.py @@ -0,0 +1,23 @@ +from projects.helmet_project import HelmetProject + + +class ProjectFactory: + """ + Project Factory initializes specific projects. + """ + + def init(self, name): + """ + Initializes specific projects with given name. + + Args: + name: name of the project, should be 'helmet'. + + Returns: + Initialized corresponding project object. + """ + if name == 'helmet': + print('Initializing Helmet Detection Project...') + return HelmetProject() + else: + raise ModuleNotFoundError('Project not found!') diff --git a/queue_harvesting.py b/queue_harvesting.py index 94fd3f5..0179cba 100644 --- a/queue_harvesting.py +++ b/queue_harvesting.py @@ -1,258 +1,75 @@ # This script is used to look for objects under a specific condition (at least 5 persons etc) # The script reads a video from a message queue, classifies the objects in the video, and does a condition check. # If condition is met, the video is being forwarded to a remote vault. - -from connections.roboflow_helper import RoboflowHelper -# Local imports -from condition import processFrame +from integrations.integration_factory import IntegrationFactory +from projects.project_factory import ProjectFactory +from services.harvest_service import HarvestService from utils.VariableClass import VariableClass -from utils.ClassificationObject import ClassificationObject - -# External imports -import os -from os.path import ( - join as pjoin, - splitext as psplitext, - basename as pbasename) -from datetime import datetime -import cv2 -import time -import requests -import torch -from ultralytics import YOLO -from uugai_python_dynamic_queue.MessageBrokers import RabbitMQ -from uugai_python_kerberos_vault.KerberosVault import KerberosVault +from utils.time_verbose_object import TimeVerbose # Initialize the VariableClass object, which contains all the necessary environment variables. var = VariableClass() def init(): + # Service and Project initializations + harvest_service = HarvestService() + harvest_service.connect('rabbitmq', 'kerberos_vault') + model1, model2 = harvest_service.connect_models() - # Initialize a message broker using the python_queue_reader package - if var.LOGGING: - print('a) Initializing RabbitMQ') - - rabbitmq = RabbitMQ( - queue_name=var.QUEUE_NAME, - target_queue_name=var.TARGET_QUEUE_NAME, - exchange=var.QUEUE_EXCHANGE, - host=var.QUEUE_HOST, - username=var.QUEUE_USERNAME, - password=var.QUEUE_PASSWORD) - - # Initialize Kerberos Vault - if var.LOGGING: - print('b) Initializing Kerberos Vault') - kerberos_vault = KerberosVault( - storage_uri=var.STORAGE_URI, - storage_access_key=var.STORAGE_ACCESS_KEY, - storage_secret_key=var.STORAGE_SECRET_KEY) + project = ProjectFactory().init('helmet') + # Mapping classes of 2 models + mapping = project.class_mapping(model1, model2) + integration = IntegrationFactory().init() while True: + # Receive message from the queue, + # and retrieve the media from the Kerberos Vault utilizing the message information. + message = harvest_service.receive_message() + if message is None: + continue # No message received, continue to the next iteration - # Receive message from the queue, and retrieve the media from the Kerberos Vault utilizing the message information. - if var.LOGGING: - print('1) Receiving message from RabbitMQ') - message = rabbitmq.receive_message() - if message == []: - if var.LOGGING: - print('No message received, waiting for 3 seconds') - time.sleep(3) - continue - if var.LOGGING: - print('2) Retrieving media from Kerberos Vault') - - mediaKey = message['payload']['key'] - provider = message['source'] - resp = kerberos_vault.retrieve_media( - message=message, - media_type='video', - media_savepath=var.MEDIA_SAVEPATH) - - # Initialize the time variables. - start_time = time.time() - total_time_preprocessing = 0 - total_time_class_prediction = 0 - total_time_processing = 0 - total_time_postprocessing = 0 - start_time_preprocessing = time.time() - - # Perform object classification on the media - # initialise the yolo model, additionally use the device parameter to specify the device to run the model on. - device = 'cuda' if torch.cuda.is_available() else 'cpu' - MODEL = YOLO(var.MODEL_NAME).to(device) - MODEL2 = None - if var.MODEL_NAME_2: - MODEL2 = YOLO(var.MODEL_NAME_2).to(device) - if var.LOGGING: - print(f'3) Using device: {device}') - - # Open video-capture/recording using the video-path. Throw FileNotFoundError if cap is unable to open. - if var.LOGGING: - print(f'4) Opening video file: {var.MEDIA_SAVEPATH}') - if not os.path.exists(var.MEDIA_SAVEPATH): - raise FileNotFoundError(f'Cannot find {var.MEDIA_SAVEPATH}') - if not var.MEDIA_SAVEPATH.lower().endswith(('.mp4', '.avi', '.mov')): - raise TypeError('Unsupported file format! Only support videos with .mp4, .avi, .mov extensions') - cap = cv2.VideoCapture(var.MEDIA_SAVEPATH) - if not cap.isOpened(): - raise FileNotFoundError('Unable to open video file') - video_out = None - # Initialize the video-writer if the SAVE_VIDEO is set to True. - if var.SAVE_VIDEO: - fourcc = cv2.VideoWriter.fourcc(*'avc1') - video_out = cv2.VideoWriter( - filename=var.OUTPUT_MEDIA_SAVEPATH, - fourcc=fourcc, - fps=var.CLASSIFICATION_FPS, - frameSize=(int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), - int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) - ) - - # Initialize the classification process. - # 2 lists are initialized: - # Classification objects - # Additional list for easy access to the ids. + media_key, provider = message['payload']['key'], message['source'] - # frame_number -> The current frame number. Depending on the frame_skip_factor this can make jumps. - # predicted_frames -> The number of frames, that were used for the prediction. This goes up by one each prediction iteration. - # frame_skip_factor is the factor by which the input video frames are skipped. - frame_number, predicted_frames = 0, 0 - frame_skip_factor = int( - cap.get(cv2.CAP_PROP_FPS) / var.CLASSIFICATION_FPS) + time_verbose = TimeVerbose() + cap = harvest_service.open_video(message) - # Loop over the video frames, and perform object classification. - # The classification process is done until the counter reaches the MAX_NUMBER_OF_PREDICTIONS or the last frame is reached. - MAX_FRAME_NUMBER = cap.get(cv2.CAP_PROP_FRAME_COUNT) if var.LOGGING: - print(f'5) Classifying frames') + print(f'5. Classifying frames') if var.TIME_VERBOSE: - total_time_preprocessing += time.time() - start_time_preprocessing - start_time_processing = time.time() - - skip_frames_counter = 0 - - result_dir_path = pjoin(pjoin(pjoin(os.getcwd(), 'data'), 'frames'), - f'{datetime.now().strftime("%d-%m-%Y_%H-%M-%S")}') - image_dir_path = pjoin(result_dir_path, 'images') - label_dir_path = pjoin(result_dir_path, 'labels') - yaml_path = pjoin(result_dir_path, 'data.yaml') - - while (predicted_frames < var.MAX_NUMBER_OF_PREDICTIONS) and (frame_number < MAX_FRAME_NUMBER): - - # Read the frame from the video-capture. - success, frame = cap.read() - if not success: - break - - # Check if we need to skip the current frame due to the skip_frames_counter. - if skip_frames_counter > 0: - skip_frames_counter -= 1 - frame_number += 1 - continue - - # Check if the frame_number corresponds to a frame that should be classified. - if frame_number > 0 and frame_skip_factor > 0 and frame_number % frame_skip_factor == 0: - frame, total_time_class_prediction, condition_met, labels_and_boxes = processFrame( - MODEL, MODEL2, frame, video_out, result_dir_path) - - # Create new directory to save frames, labels and boxes for when the first frame met the condition - if predicted_frames == 0 and condition_met: - os.makedirs(f'{image_dir_path}', exist_ok=True) - os.makedirs(f'{label_dir_path}', exist_ok=True) - if condition_met: - print(f'Processing frame: {predicted_frames}') - # Save original frame - unix_time = int(time.time()) - cv2.imwrite( - f'{image_dir_path}/{unix_time}.png', - frame) - print("Saving frame, labels and boxes") - # Save labels and boxes - with open(f'{label_dir_path}/{unix_time}.txt', - 'w') as my_file: - my_file.write(labels_and_boxes) - - # Set the skip_frames_counter to 50 to skip the next 50 frames. - skip_frames_counter = 50 - - # Increase the frame_number and predicted_frames by one. - predicted_frames += 1 - - frame_number += 1 - - # Create yaml file afterward - # Upload to roboflow after processing frames if any - if os.path.exists(result_dir_path) and var.RBF_UPLOAD: - label_names = [name for name in list(MODEL.names.values())] - create_yaml(yaml_path, label_names) + time_verbose.add_preprocessing_time() + save_dir = harvest_service.process( + cap, + model1, + model2, + project.condition_func, + mapping) - rb = RoboflowHelper() - if rb: - rb.upload_dataset(result_dir_path) - else: - print('Nothing to upload!!') + if var.DATASET_UPLOAD: + integration.upload_dataset(save_dir) # We might remove the recording from the vault after analyzing it. (default is False) # This might be the case if we only need to create a dataset from the recording and do not need to store it. # Delete the recording from Kerberos Vault if the REMOVE_AFTER_PROCESSED is set to True. - removeAfterProcessed = os.getenv( - "REMOVE_AFTER_PROCESSED", "False") - if removeAfterProcessed == "True": - # Delete the recording from Kerberos Vault - response = requests.delete( - var.STORAGE_URI + '/storage', - headers={ - 'X-Kerberos-Storage-FileName': mediaKey, - 'X-Kerberos-Storage-Provider': provider, - 'X-Kerberos-Storage-AccessKey': var.STORAGE_ACCESS_KEY, - 'X-Kerberos-Storage-SecretAccessKey': var.STORAGE_SECRET_KEY, - } - ) - if response.status_code != 200: - print( - "Something went wrong while delete media: " + response.content) - else: - print("Delete media from " + var.STORAGE_URI) + harvest_service.delete_media(media_key, provider) if var.TIME_VERBOSE: - total_time_processing += time.time() - start_time_processing + time_verbose.add_preprocessing_time() # Depending on the TIME_VERBOSE parameter, the time it took to classify the objects is printed. if var.TIME_VERBOSE: - print( - f'\t - Classification took: {round(time.time() - start_time, 1)} seconds, @ {var.CLASSIFICATION_FPS} fps.') - print( - f'\t\t - {round(total_time_preprocessing, 2)}s for preprocessing and initialisation') - print( - f'\t\t - {round(total_time_processing, 2)}s for processing of which:') - print( - f'\t\t\t - {round(total_time_class_prediction, 2)}s for class prediction') - print( - f'\t\t\t - {round(total_time_processing - total_time_class_prediction, 2)}s for other processing') - print( - f'\t\t - {round(total_time_postprocessing, 2)}s for postprocessing') - print(f'\t - Original video: {round(cap.get(cv2.CAP_PROP_FRAME_COUNT)/cap.get(cv2.CAP_PROP_FPS), 1)} seconds, @ {round(cap.get(cv2.CAP_PROP_FPS), 1)} fps @ {int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}. File size of {round(os.path.getsize(var.MEDIA_SAVEPATH)/1024**2, 1)} MB') + time_verbose.show_result() - # If the videowriter was active, the videowriter is released. - # Close the video-capture and destroy all windows. if var.LOGGING: print('8) Releasing video writer and closing video capture') print("\n\n") - video_out.release() if var.SAVE_VIDEO else None - cap.release() - if var.PLOT: - cv2.destroyAllWindows() + # TODO: CARE AB THIS + # video_out.release() if var.SAVE_VIDEO else None + # cap.release() + # if var.PLOT: + # cv2.destroyAllWindows() -def create_yaml(file_path, label_names): - with open(file_path, 'w') as my_file: - content ='names:\n' - for name in label_names: - content += f'- {name}\n' # class mapping for helmet detection project - content += f'nc: {len(label_names)}' - my_file.write(content) # Run the init function. init() diff --git a/services/harvest_service.py b/services/harvest_service.py new file mode 100644 index 0000000..001c6d2 --- /dev/null +++ b/services/harvest_service.py @@ -0,0 +1,252 @@ +from uugai_python_dynamic_queue.MessageBrokers import RabbitMQ +from uugai_python_kerberos_vault.KerberosVault import KerberosVault + +from exports.export_factory import ExportFactory +from services.iharvest_service import IHarvestService +from utils.VariableClass import VariableClass +import time +import requests +from ultralytics import YOLO +import torch +import os +import cv2 +from condition import process_frame as con_process_frame + + +class HarvestService(IHarvestService): + """ + HarvestService class responsible for handling tasks such as connecting to + RabbitMQ and Kerberos Vault, receiving and processing messages, downloading + and opening video files, and processing video frames using YOLO models. + """ + + def __init__(self): + """ + Constructor. + """ + self.rabbitmq = None + self.vault = None + self.frame_number = 0 + self.predicted_frames = 0 + self.max_frame_number = None + self.frame_skip_factor = 0 + # Initialize the VariableClass object, which contains all the necessary environment variables. + self._var = VariableClass() + self._save_format = ExportFactory().init(proj_name='helmet_detection') + + def connect(self, *agents): + """ + See iharvest_service.py + """ + if 'rabbitmq' not in agents and 'kerberos_vault' not in agents: + raise TypeError('Missing agent!') + + if self._var.LOGGING: + print('a) Initializing RabbitMQ') + + # Initialize a message broker using the python_queue_reader package + self.rabbitmq = RabbitMQ( + queue_name=self._var.QUEUE_NAME, + target_queue_name=self._var.TARGET_QUEUE_NAME, + exchange=self._var.QUEUE_EXCHANGE, + host=self._var.QUEUE_HOST, + username=self._var.QUEUE_USERNAME, + password=self._var.QUEUE_PASSWORD) + + if self._var.LOGGING: + print('b) Initializing Kerberos Vault') + + self.vault = KerberosVault( + storage_uri=self._var.STORAGE_URI, + storage_access_key=self._var.STORAGE_ACCESS_KEY, + storage_secret_key=self._var.STORAGE_SECRET_KEY) + + def receive_message(self): + """ + See iharvest_service.py + + Returns: + dict or None: The received message if available, otherwise None. + """ + # Receive message from the queue, + # and retrieve the media from the Kerberos Vault utilizing the message information. + if self._var.LOGGING: + print('1) Receiving message from RabbitMQ') + message = self.rabbitmq.receive_message() + if not message: + if self._var.LOGGING: + print('No message received, waiting for 3 seconds') + time.sleep(3) + return None + if self._var.LOGGING: + print('2) Retrieving media from Kerberos Vault') + + self.vault.retrieve_media( + message=message, + media_type='video', + media_savepath=self._var.MEDIA_SAVEPATH) + return message + + def delete_media(self, media_key, provider): + """ + See iharvest_service.py + """ + if self._var.REMOVE_AFTER_PROCESSED: + # Delete the recording from Kerberos Vault + response = requests.delete( + self._var.STORAGE_URI + '/storage', + headers={ + 'X-Kerberos-Storage-FileName': media_key, + 'X-Kerberos-Storage-Provider': provider, + 'X-Kerberos-Storage-AccessKey': self._var.STORAGE_ACCESS_KEY, + 'X-Kerberos-Storage-SecretAccessKey': self._var.STORAGE_SECRET_KEY, + } + ) + if response.status_code != 200: + print( + "Something went wrong while delete media: " + response.content) + else: + print("Delete media from " + self._var.STORAGE_URI) + + def connect_models(self): + """ + See iharvest_service.py + """ + _cur_dir = os.getcwd() + # initialise the yolo model, additionally use the device parameter to specify the device to run the model on. + device = 'cuda' if torch.cuda.is_available() else 'cpu' + model = YOLO(self._var.MODEL_NAME).to(device) + model2 = YOLO(self._var.MODEL_NAME_2).to(device) + if model and model2: + print(f'2. Using device: {device}') + print(f'3. Using models: {self._var.MODEL_NAME} and {self._var.MODEL_NAME_2}') + return model, model2 + else: + raise ModuleNotFoundError('Something wrong happened!') + + def open_video(self, message=''): + """ + See iharvest_service.py + + Returns: + cv2.VideoCapture: The video capture object for the opened video. + """ + if message: + # Download video from vault if there is a message + self.__download_video__(message) + + # Open video-capture/recording using the video-path. Throw FileNotFoundError if cap is unable to open. + if self._var.LOGGING: + print(f'4. Opening video file: {self._var.MEDIA_SAVEPATH}') + if not os.path.exists(self._var.MEDIA_SAVEPATH): + raise FileNotFoundError(f'Cannot find {self._var.MEDIA_SAVEPATH}') + if not self._var.MEDIA_SAVEPATH.lower().endswith(('.mp4', '.avi', '.mov')): + raise TypeError('Unsupported file format! Only support videos with .mp4, .avi, .mov extensions') + cap = cv2.VideoCapture(self._var.MEDIA_SAVEPATH) + if not cap.isOpened(): + raise FileNotFoundError('Unable to open video file') + # Initialize the video-writer if the SAVE_VIDEO is set to True. + # if self._var.SAVE_VIDEO: + # fourcc = cv2.VideoWriter.fourcc(*'avc1') + # video_out = cv2.VideoWriter( + # filename=self._var.OUTPUT_MEDIA_SAVEPATH, + # fourcc=fourcc, + # fps=self._var.CLASSIFICATION_FPS, + # frameSize=(int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), + # int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) + # ) + self.frame_number = 0 + self.predicted_frames = 0 + self.max_frame_number = cap.get(cv2.CAP_PROP_FRAME_COUNT) + self.frame_skip_factor = int( + cap.get(cv2.CAP_PROP_FPS) / self._var.CLASSIFICATION_FPS) + return cap + + def process(self, cap, model1, model2, condition_func, mapping): + if self.max_frame_number > 0: + skip_frames_counter = 0 + + # Create save dir and yaml file + success = self._save_format.initialize_save_dir() + if success and self._var.DATASET_FORMAT == 'roboflow': + self._save_format.create_yaml(model2) + + while (self.predicted_frames < self._var.MAX_NUMBER_OF_PREDICTIONS) and ( + self.frame_number < self.max_frame_number): + # Read the frame from the video-capture. + success, frame, skip_frames_counter = self.get_frame(cap, skip_frames_counter) + # Increment frame number after processing + self.frame_number += 1 + + if not success: + break + + if frame is None: + continue + + # Process frame + skip_frames_counter = self.predict_frame( + model1, + model2, + frame, + condition_func, + mapping, + skip_frames_counter) + return self._save_format.result_dir_path + + def get_frame(self, cap: cv2.VideoCapture, skip_frames_counter): + """ + See iharvest_service.py + + Returns: + tuple: A tuple containing a boolean indicating success, the frame (or None), + and the updated skip frames counter. + """ + # Check if we need to skip the current frame due to the skip_frames_counter. + if skip_frames_counter > 0: + return True, None, skip_frames_counter - 1 + + success, frame = cap.read() + if not success: + return False, None, skip_frames_counter + + return True, frame, skip_frames_counter + + def predict_frame( + self, + model1, + model2, + frame, + condition_func, + mapping, + skip_frames_counter): + """ + See iharvest_service.py + + Returns: + int: The updated skip frames counter. + """ + if self.frame_number > 0 and self.frame_skip_factor > 0 and self.frame_number % self.frame_skip_factor == 0: + frame, total_time_class_prediction, condition_met, labels_and_boxes = con_process_frame( + model1, model2, frame, condition_func, mapping) + + if condition_met: + self.predicted_frames = self._save_format.save_frame(frame, self.predicted_frames, cv2, + labels_and_boxes) + skip_frames_counter = 50 + print(f'Currently in frame: {self.frame_number}') + self.frame_number += 1 + return skip_frames_counter + + def __download_video__(self, message): + """ + Downloads the video from Kerberos Vault using the provided message details. + + Args: + message: The message containing details required to retrieve the video. + """ + self.vault.retrieve_media( + message=message, + media_type='video', + media_savepath=self._var.MEDIA_SAVEPATH) + print(f'Video downloaded under {self._var.MEDIA_SAVEPATH}') diff --git a/services/iharvest_service.py b/services/iharvest_service.py new file mode 100644 index 0000000..871e96d --- /dev/null +++ b/services/iharvest_service.py @@ -0,0 +1,84 @@ +from abc import ABC, abstractmethod + + +class IHarvestService(ABC): + """ + Interface for Harvest Service + """ + + @abstractmethod + def connect(self, *agents): + """ + Connects to the required agents, specifically RabbitMQ and Kerberos Vault. + + Args: + agents (tuple): A tuple containing the names of agents to connect to. + Must include 'rabbitmq' and/or 'kerberos_vault'. + + Raises: + TypeError: If neither 'rabbitmq' nor 'kerberos_vault' is included in agents. + """ + pass + + @abstractmethod + def receive_message(self): + """ + Receives a message from RabbitMQ and retrieves the corresponding media + from Kerberos Vault. + + """ + pass + + @abstractmethod + def delete_media(self, media_key, provider): + """ + Deletes the processed recording from Kerberos Vault if + REMOVE_AFTER_PROCESSED is set to True. + + Args: + media_key: The key of the media to delete from the vault. + provider: The provider information for the media in the vault. + """ + pass + + @abstractmethod + def connect_models(self): + """ + Initializes the YOLO models and connects them to the appropriate device (CPU or GPU). + + Returns: + tuple: A tuple containing two YOLO models. + + Raises: + ModuleNotFoundError: If the models cannot be loaded. + """ + pass + + @abstractmethod + def open_video(self, message=''): + """ + Opens a video file from the specified path, downloading it from the vault if necessary. + + Args: + message: The message to use for downloading the video. Defaults to ''. + + Raises: + FileNotFoundError: If the video file cannot be found or opened. + TypeError: If the video file format is unsupported. + """ + pass + + @abstractmethod + def get_frame(self, cap, skip_frames_counter): + """ + Retrieves the next frame from the video capture object, potentially skipping frames. + + Args: + cap (cv2.VideoCapture): The video capture object. + skip_frames_counter (int): The number of frames to skip. + + Returns: + tuple: A tuple containing a boolean indicating success, the frame (or None), + and the updated skip frames counter. + """ + pass diff --git a/single-shot.py b/single-shot.py index fffb0d5..7d74563 100644 --- a/single-shot.py +++ b/single-shot.py @@ -2,73 +2,39 @@ # The script reads a video from a message queue, classifies the objects in the video, and does a condition check. # If condition is met, the video is being forwarded to a remote vault. -from connections.roboflow_helper import RoboflowHelper +from projects.project_factory import ProjectFactory +from services.harvest_service import HarvestService +from integrations.roboflow_integration import RoboflowIntegration # Local imports from utils.VariableClass import VariableClass -from condition import processFrame # External imports import os -from os.path import ( - join as pjoin, - splitext as psplitext, - basename as pbasename) -from datetime import datetime + import cv2 -import time -import torch -from ultralytics import YOLO + +from utils.time_verbose_object import TimeVerbose # Initialize the VariableClass object, which contains all the necessary environment variables. var = VariableClass() def init(): + harvest_service = HarvestService() + model1, model2 = harvest_service.connect_models() - # Initialize the time variables. - start_time = time.time() - total_time_preprocessing = 0 - total_time_class_prediction = 0 - total_time_processing = 0 - total_time_postprocessing = 0 - start_time_preprocessing = time.time() + project = ProjectFactory().init('helmet') # Perform object classification on the media - # initialise the yolo model, additionally use the device parameter to specify the device to run the model on. - device = 'cuda' if torch.cuda.is_available() else 'cpu' - MODEL = YOLO(var.MODEL_NAME).to(device) - MODEL2 = None - if var.MODEL_NAME_2: - MODEL2 = YOLO(var.MODEL_NAME_2).to(device) - if var.LOGGING: - print(f'3) Using device: {device}') # Open video-capture/recording using the video-path. Throw FileNotFoundError if cap is unable to open. - if var.LOGGING: - print(f'4) Opening video file: {var.MEDIA_SAVEPATH}') - if not os.path.exists(var.MEDIA_SAVEPATH): - raise FileNotFoundError(f'Cannot find {var.MEDIA_SAVEPATH}') - if not var.MEDIA_SAVEPATH.lower().endswith(('.mp4', '.avi', '.mov')): - raise TypeError('Unsupported file format! Only support videos with .mp4, .avi, .mov extensions') - cap = cv2.VideoCapture(var.MEDIA_SAVEPATH) - if not cap.isOpened(): - raise FileNotFoundError('Unable to open video file') - video_out = None - # Initialize the video-writer if the SAVE_VIDEO is set to True. - if var.SAVE_VIDEO: - fourcc = cv2.VideoWriter.fourcc(*'avc1') - video_out = cv2.VideoWriter( - filename=var.OUTPUT_MEDIA_SAVEPATH, - fourcc=fourcc, - fps=var.CLASSIFICATION_FPS, - frameSize=(int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), - int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) - ) + cap = harvest_service.open_video() + time_verbose = TimeVerbose() # Initialize the classification process. # 2 lists are initialized: - # Classification objects - # Additional list for easy access to the ids. + # Classification objects + # Additional list for easy access to the ids. # frame_number -> The current frame number. Depending on the frame_skip_factor this can make jumps. # predicted_frames -> The number of frames, that were used for the prediction. This goes up by one each prediction iteration. @@ -82,108 +48,72 @@ def init(): if var.LOGGING: print(f'5) Classifying frames') if var.TIME_VERBOSE: - total_time_preprocessing += time.time() - start_time_preprocessing - start_time_processing = time.time() + time_verbose.add_preprocessing_time() skip_frames_counter = 0 - result_dir_path = pjoin(pjoin(pjoin(os.getcwd(), 'data'), 'frames'), f'{datetime.now().strftime("%d-%m-%Y_%H-%M-%S")}') - image_dir_path = pjoin(result_dir_path, 'images') - label_dir_path = pjoin(result_dir_path, 'labels') - yaml_path = pjoin(result_dir_path, 'data.yaml') + result_dir_path, image_dir_path, label_dir_path, yaml_path = project.create_result_save_dir() + mapping = project.class_mapping(model1, model2) while (predicted_frames < var.MAX_NUMBER_OF_PREDICTIONS) and (frame_number < MAX_FRAME_NUMBER): + success, frame, skip_frames_counter = harvest_service.get_video_frame(cap, skip_frames_counter) - # Read the frame from the video-capture. - success, frame = cap.read() + if success and frame is None: + continue if not success: break - # Check if we need to skip the current frame due to the skip_frames_counter. - if skip_frames_counter > 0: - skip_frames_counter -= 1 - frame_number += 1 - continue - - # Check if the frame_number corresponds to a frame that should be classified. - if frame_number > 0 and frame_skip_factor > 0 and frame_number % frame_skip_factor == 0: - frame, total_time_class_prediction, condition_met, labels_and_boxes = processFrame( - MODEL, MODEL2, frame, video_out, result_dir_path) - - # Create new directory to save frames, labels and boxes for when the first frame met the condition - if predicted_frames == 0 and condition_met: - os.makedirs(f'{image_dir_path}', exist_ok=True) - os.makedirs(f'{label_dir_path}', exist_ok=True) - if condition_met: - print(f'Processing frame: {predicted_frames}') - # Save original frame - unix_time = int(time.time()) - cv2.imwrite( - f'{image_dir_path}/{unix_time}.png', - frame) - print("Saving frame, labels and boxes") - # Save labels and boxes - with open(f'{label_dir_path}/{unix_time}.txt', - 'w') as my_file: - my_file.write(labels_and_boxes) - - # Set the skip_frames_counter to 50 to skip the next 50 frames. - skip_frames_counter = 50 - - # Increase the frame_number and predicted_frames by one. - predicted_frames += 1 - - frame_number += 1 - - # Create yaml file afterward - label_names = [name for name in list(MODEL.names.values())] - create_yaml(yaml_path, label_names) + # Process frame + skip_frames_counter = harvest_service.process_frame( + frame_skip_factor, + skip_frames_counter, + model1, + model2, + project.condition_func, + mapping, + result_dir_path, + image_dir_path, + label_dir_path, + frame, + None) + + # Upload to roboflow + project.upload_dataset(result_dir_path, yaml_path, model2) # Upload to roboflow after processing frames if any if os.path.exists(result_dir_path) and var.RBF_UPLOAD: - rb = RoboflowHelper() + rb = RoboflowIntegration() if rb: rb.upload_dataset(result_dir_path) else: print('Nothing to upload!!') if var.TIME_VERBOSE: - total_time_processing += time.time() - start_time_processing + time_verbose.add_preprocessing_time() # Depending on the TIME_VERBOSE parameter, the time it took to classify the objects is printed. if var.TIME_VERBOSE: - print( - f'\t - Classification took: {round(time.time() - start_time, 1)} seconds, @ {var.CLASSIFICATION_FPS} fps.') - print( - f'\t\t - {round(total_time_preprocessing, 2)}s for preprocessing and initialisation') - print( - f'\t\t - {round(total_time_processing, 2)}s for processing of which:') - print( - f'\t\t\t - {round(total_time_class_prediction, 2)}s for class prediction') - print( - f'\t\t\t - {round(total_time_processing - total_time_class_prediction, 2)}s for other processing') - print( - f'\t\t - {round(total_time_postprocessing, 2)}s for postprocessing') - print(f'\t - Original video: {round(cap.get(cv2.CAP_PROP_FRAME_COUNT)/cap.get(cv2.CAP_PROP_FPS), 1)} seconds, @ {round(cap.get(cv2.CAP_PROP_FPS), 1)} fps @ {int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}. File size of {round(os.path.getsize(var.MEDIA_SAVEPATH)/1024**2, 1)} MB') - + time_verbose.show_result() # If the videowriter was active, the videowriter is released. # Close the video-capture and destroy all windows. if var.LOGGING: print('8) Releasing video writer and closing video capture') print("\n\n") - video_out.release() if var.SAVE_VIDEO else None - cap.release() - if var.PLOT: - cv2.destroyAllWindows() + # video_out.release() if var.SAVE_VIDEO else None + # cap.release() + # if var.PLOT: + # cv2.destroyAllWindows() + def create_yaml(file_path, label_names): with open(file_path, 'w') as my_file: - content ='names:\n' + content = 'names:\n' for name in label_names: - content += f'- {name}\n' # class mapping for helmet detection project + content += f'- {name}\n' # class mapping for helmet detection project content += f'nc: {len(label_names)}' my_file.write(content) + # Run the init function. init() diff --git a/utils/VariableClass.py b/utils/VariableClass.py index 2eab4a6..04c2328 100644 --- a/utils/VariableClass.py +++ b/utils/VariableClass.py @@ -19,6 +19,12 @@ def __init__(self): self.MODEL_NAME = os.getenv("MODEL_NAME") self.MEDIA_SAVEPATH = os.getenv("MEDIA_SAVEPATH") self.MODEL_NAME_2 = os.getenv("MODEL_NAME_2") + self.MODEL_ALLOWED_CLASSES = list(map(int, os.getenv('MODEL_ALLOWED_CLASSES', '0').split(','))) + self.MODEL_2_ALLOWED_CLASSES = list(map(int, os.getenv('MODEL_2_ALLOWED_CLASSES', '0').split(','))) + + # Model parameters + self.DATASET_FORMAT = os.getenv("DATASET_FORMAT") + self.DATASET_VERSION = os.getenv("DATASET_VERSION") # Queue parameters self.QUEUE_NAME = os.getenv("QUEUE_NAME") @@ -44,6 +50,7 @@ def __init__(self): self.CREATE_BBOX_FRAME = os.getenv("CREATE_BBOX_FRAME") == "True" self.SAVE_BBOX_FRAME = os.getenv("SAVE_BBOX_FRAME") == "True" self.BBOX_FRAME_SAVEPATH = os.getenv("BBOX_FRAME_SAVEPATH") + self.REMOVE_AFTER_PROCESSED = os.getenv("REMOVE_AFTER_PROCESSED") == "False" if self.SAVE_BBOX_FRAME: self.CREATE_BBOX_FRAME = True @@ -54,6 +61,7 @@ def __init__(self): self.CREATE_RETURN_JSON = True self.SAVE_VIDEO = os.getenv("SAVE_VIDEO") == "True" + self.SAVE_FRAMES = os.getenv("SAVE_FRAMES") == "True" self.OUTPUT_MEDIA_SAVEPATH = os.getenv("OUTPUT_MEDIA_SAVEPATH") self.FIND_DOMINANT_COLORS = os.getenv("FIND_DOMINANT_COLORS") == "True" @@ -70,7 +78,7 @@ def __init__(self): self.CLASSIFICATION_FPS = int(os.getenv("CLASSIFICATION_FPS", "15")) if os.getenv("CLASSIFICATION_THRESHOLD") is not None and os.getenv("CLASSIFICATION_THRESHOLD") != "": self.CLASSIFICATION_THRESHOLD = float( - os.getenv("CLASSIFICATION_THRESHOLD")) + os.getenv("CLASSIFICATION_THRESHOLD")) if os.getenv("MAX_NUMBER_OF_PREDICTIONS") is not None and os.getenv("CLASSIFICATION_FPS") != "": self.MAX_NUMBER_OF_PREDICTIONS = int( os.getenv("MAX_NUMBER_OF_PREDICTIONS", "50")) @@ -81,7 +89,8 @@ def __init__(self): os.getenv("MIN_STATIC_DISTANCE", "100")) if os.getenv("MIN_DETECTIONS") is not None and os.getenv("MIN_DETECTIONS") != "": self.MIN_DETECTIONS = int(os.getenv("MIN_DETECTIONS", "5")) - + self.FRAMES_SKIP_AFTER_DETECT = int(os.getenv("FRAMES_SKIP_AFTER_DETECT", "50")) + self.IOU = float(os.getenv("IOU", "0.85")) ALLOWED_CLASSIFICATIONS_STR = os.getenv("ALLOWED_CLASSIFICATIONS") self.ALLOWED_CLASSIFICATIONS = [ @@ -90,8 +99,18 @@ def __init__(self): self.TRANSLATED_CLASSIFICATIONS = [ item.strip() for item in TRANSLATED_CLASSIFICATIONS_STR.split(',')] + # Integration parameters + self.INTEGRATION_NAME = os.getenv("INTEGRATION_NAME") + # Roboflow parameters self.ROBOFLOW_API_KEY = os.getenv("RBF_API_KEY") self.ROBOFLOW_WORKSPACE = os.getenv("RBF_WORKSPACE") self.ROBOFLOW_PROJECT = os.getenv("RBF_PROJECT") - self.RBF_UPLOAD = os.getenv("RBF_UPLOAD") == "True" + self.DATASET_UPLOAD = os.getenv("DATASET_UPLOAD") == "True" + + # S3 parameters + self.S3_ENDPOINT = os.getenv("S3_ENDPOINT") + self.S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY") + self.S3_SECRET_KEY = os.getenv("S3_SECRET_KEY") + self.S3_BUCKET = os.getenv("S3_BUCKET") + diff --git a/utils/time_verbose_object.py b/utils/time_verbose_object.py new file mode 100644 index 0000000..e03ee7d --- /dev/null +++ b/utils/time_verbose_object.py @@ -0,0 +1,44 @@ +import time + + +class TimeVerbose: + """ + Time Verbose class tracks and reports the time spent in different stages of a process, + including preprocessing, processing, and postprocessing. + """ + + def __init__(self): + """ + Constructor. + """ + self.start_time = time.time() + self.total_time_preprocessing = 0 + self.total_time_class_prediction = 0 + self.total_time_processing = 0 + self.total_time_postprocessing = 0 + self.start_time_preprocessing = time.time() + + def add_preprocessing_time(self): + """ + Adds the time spent in preprocessing to the total time and resets the + start time for the next preprocessing segment. + """ + self.total_time_preprocessing += time.time() - self.start_time_preprocessing + self.start_time_preprocessing = time.time() + + def show_result(self): + """ + Prints a detailed breakdown of the time spent on different stages. + """ + print( + f'\t - Classification took: {round(time.time() - self.start_time, 1)} seconds.') + print( + f'\t\t - {round(self.total_time_preprocessing, 2)}s for preprocessing and initialization') + print( + f'\t\t - {round(self.total_time_processing, 2)}s for processing of which:') + print( + f'\t\t\t - {round(self.total_time_class_prediction, 2)}s for class prediction') + print( + f'\t\t\t - {round(self.total_time_processing - self.total_time_class_prediction, 2)}s for other processing') + print( + f'\t\t - {round(self.total_time_postprocessing, 2)}s for postprocessing')