diff --git a/nautobot_device_lifecycle_mgmt/jobs/cve_tracking.py b/nautobot_device_lifecycle_mgmt/jobs/cve_tracking.py index b4ad860d..13a0dede 100644 --- a/nautobot_device_lifecycle_mgmt/jobs/cve_tracking.py +++ b/nautobot_device_lifecycle_mgmt/jobs/cve_tracking.py @@ -152,7 +152,7 @@ def run(self, *args, **kwargs): # pylint: disable=too-many-locals extra={"object": software.platform, "grouping": "CVE Information"}, ) - software_cve_info = self.get_cve_info(cpe_software_search_urls, software.id) + software_cve_info = self.get_cve_info(cpe_software_search_urls, software) all_software_cve_info = {**software_cve_info["new"], **software_cve_info["existing"]} cve_counter += len(software_cve_info["new"]) @@ -160,7 +160,16 @@ def run(self, *args, **kwargs): # pylint: disable=too-many-locals for software_cve, cve_info in all_software_cve_info.items(): matching_dlc_cve = CVELCM.objects.get(name=software_cve) - self.associate_software_to_cve(software.id, matching_dlc_cve.id) + + try: + matching_dlc_cve.affected_softwares.add(software) + except IntegrityError as err: + self.logger.error( + "Unable to create association between CVE and Software Version. ERROR: %s", + err, + extra={"object": matching_dlc_cve, "grouping": "CVE Association"}, + ) + if str(cve_info["modified_date"][0:10]) != str(matching_dlc_cve.last_modified_date): self.update_cve(matching_dlc_cve, cve_info) continue @@ -172,21 +181,6 @@ def run(self, *args, **kwargs): # pylint: disable=too-many-locals "Performed discovery on all software. Created %s CVE.", cve_counter, extra={"grouping": "CVE Creation"} ) - def associate_software_to_cve(self, software_id, cve_id): - """A function to associate software to a CVE.""" - cve = CVELCM.objects.get(id=cve_id) - software = SoftwareVersion.objects.get(id=software_id) - - try: - cve.affected_softwares.add(software) - - except IntegrityError as err: - self.logger.error( - "Unable to create association between CVE and Software Version. ERROR: %s", - err, - extra={"object": cve, "grouping": "CVE Association"}, - ) - def create_dlc_cves(self, cpe_cves: dict) -> None: """Create the list of needed items and insert into to DLC CVEs.""" created_count = 0 @@ -217,7 +211,7 @@ def create_dlc_cves(self, cpe_cves: dict) -> None: self.logger.info("Created New CVEs.", extra={"grouping": "CVE Creation"}) - def get_cve_info(self, cpe_software_search_urls: list, software_id=None) -> dict: + def get_cve_info(self, cpe_software_search_urls: list, software) -> dict: """Search NIST for software and related CVEs.""" for cpe_software_search_url in cpe_software_search_urls: result = self.query_api(cpe_software_search_url) @@ -227,22 +221,22 @@ def get_cve_info(self, cpe_software_search_urls: list, software_id=None) -> dict self.logger.info( "Received %s results.", result["totalResults"], - extra={"object": SoftwareVersion.objects.get(id=software_id), "grouping": "CVE Creation"}, + extra={"object": software, "grouping": "CVE Creation"}, ) cve_list = [cve["cve"] for cve in result["vulnerabilities"]] dlc_cves = CVELCM.objects.values_list("name", flat=True) - all_cve_info = self.process_cves(cve_list, dlc_cves, software_id) + all_cve_info = self.process_cves(cve_list, dlc_cves, software) return all_cve_info - def process_cves(self, cve_list, dlc_cves, software_id): + def process_cves(self, cve_list, dlc_cves, software): """Method to return processed CVE info. Args: cve_list (list): List of CVE returned from CPE search dlc_cves (list): List of all DLM CVE objects - software_id (object): UUID of the Software being queried + software (object): UUID of the Software being queried Returns: dict: Dictionary of CVEs in either new or existing categories @@ -260,7 +254,7 @@ def process_cves(self, cve_list, dlc_cves, software_id): processed_cve_info["existing"].update({cve_name: self.prep_cve_for_dlc(cve)}) self.logger.info( "Prepared %s CVE for creation." % len(processed_cve_info["new"]), - extra={"object": SoftwareVersion.objects.get(id=software_id), "grouping": "CVE Creation"}, + extra={"object": software, "grouping": "CVE Creation"}, ) return processed_cve_info