diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 6359b613..e9d2d9ef 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -12,3 +12,5 @@ /nautobot_ssot/integrations/meraki/ @jdrew82 @nautobot/plugin-ssot /nautobot_ssot/integrations/servicenow/ @glennmatthews @qduk @nautobot/plugin-ssot /nautobot_ssot/integrations/slurpit/ @lpconsulting321 @pietos @nautobot/plugin-ssot +/nautobot_ssot/integrations/solarwinds/ @jdrew82 @nopg @nautobot/plugin-ssot + diff --git a/README.md b/README.md index 9dd05c05..997ba529 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,7 @@ This Nautobot application framework includes the following integrations: - Cisco Meraki - ServiceNow - Slurpit +- SolarWinds Read more about integrations [here](https://docs.nautobot.com/projects/ssot/en/latest/user/integrations). To enable and configure integrations follow the instructions from [the install guide](https://docs.nautobot.com/projects/ssot/en/latest/admin/install/#integrations-configuration). @@ -95,6 +96,7 @@ The SSoT framework includes a number of integrations with external Systems of Re * Cisco Meraki * ServiceNow * Slurpit +* SolarWinds > Note that the Arista CloudVision integration is currently incompatible with the [Arista Labs](https://labs.arista.com/) environment due to a TLS issue. It has been confirmed to work in on-prem environments previously. diff --git a/changes/631.added b/changes/631.added new file mode 100644 index 00000000..a53cb059 --- /dev/null +++ b/changes/631.added @@ -0,0 +1 @@ +Added integration with SolarWinds. diff --git a/changes/631.changed b/changes/631.changed new file mode 100644 index 00000000..28acc2d8 --- /dev/null +++ b/changes/631.changed @@ -0,0 +1 @@ +Added documentation for SolarWinds integration. diff --git a/development/creds.example.env b/development/creds.example.env index 49ec37da..b7b31f0d 100644 --- a/development/creds.example.env +++ b/development/creds.example.env @@ -36,6 +36,9 @@ NAUTOBOT_SSOT_CITRIX_ADM_PASSWORD="changeme" NAUTOBOT_SSOT_INFOBLOX_PASSWORD="changeme" +NAUTOBOT_SSOT_SOLARWINDS_USERNAME="admin" +NAUTOBOT_SSOT_SOLARWINDS_PASSWORD="changeme" + # ACI Credentials. Append friendly name to the end to identify each APIC. NAUTOBOT_APIC_BASE_URI_NTC=https://aci.cloud.networktocode.com NAUTOBOT_APIC_USERNAME_NTC=admin diff --git a/development/development.env b/development/development.env index 5f1b7ee2..6458a8c0 100644 --- a/development/development.env +++ b/development/development.env @@ -115,4 +115,6 @@ IPFABRIC_TIMEOUT=15 NAUTOBOT_SSOT_ENABLE_ITENTIAL="True" NAUTOBOT_SSOT_ENABLE_SLURPIT="False" -SLURPIT_HOST="https://sandbox.slurpit.io" \ No newline at end of file +SLURPIT_HOST="https://sandbox.slurpit.io" + +NAUTOBOT_SSOT_ENABLE_SOLARWINDS="False" diff --git a/development/nautobot_config.py b/development/nautobot_config.py index a6d7b7f1..918f9ad5 100644 --- a/development/nautobot_config.py +++ b/development/nautobot_config.py @@ -232,6 +232,7 @@ "enable_meraki": is_truthy(os.getenv("NAUTOBOT_SSOT_ENABLE_MERAKI")), "enable_servicenow": is_truthy(os.getenv("NAUTOBOT_SSOT_ENABLE_SERVICENOW")), "enable_slurpit": is_truthy(os.getenv("NAUTOBOT_SSOT_ENABLE_SLURPIT")), + "enable_solarwinds": is_truthy(os.getenv("NAUTOBOT_SSOT_ENABLE_SOLARWINDS")), "hide_example_jobs": is_truthy(os.getenv("NAUTOBOT_SSOT_HIDE_EXAMPLE_JOBS")), "device42_defaults": { "site_status": "Active", diff --git a/docs/admin/install.md b/docs/admin/install.md index 77646bd7..2643bbaa 100644 --- a/docs/admin/install.md +++ b/docs/admin/install.md @@ -100,3 +100,4 @@ Set up each integration using the specific guides: - [Cisco Meraki](./integrations/meraki_setup.md) - [ServiceNow](./integrations/servicenow_setup.md) - [Slurpit](./integrations/slurpit_setup.md) +- [SolarWinds](./integrations/solarwinds_setup.md) diff --git a/docs/admin/integrations/index.md b/docs/admin/integrations/index.md index 23d7582f..ab1036b3 100644 --- a/docs/admin/integrations/index.md +++ b/docs/admin/integrations/index.md @@ -14,3 +14,4 @@ This Nautobot app supports the following integrations: - [Cisco Meraki](./meraki_setup.md) - [ServiceNow](./servicenow_setup.md) - [Slurpit](./slurpit_setup.md) +- [SolarWinds](./solarwinds_setup.md) diff --git a/docs/admin/integrations/solarwinds_setup.md b/docs/admin/integrations/solarwinds_setup.md new file mode 100644 index 00000000..23443c8d --- /dev/null +++ b/docs/admin/integrations/solarwinds_setup.md @@ -0,0 +1,25 @@ +# SolarWinds Integration Setup + +This guide will walk you through steps to set up the SolarWinds integration with the `nautobot_ssot` app. + +## Prerequisites + +Before configuring the integration, please ensure, that `nautobot-ssot` app was [installed with the SolarWinds integration extra dependencies](../install.md#install-guide). + +```shell +pip install nautobot-ssot[solarwinds] +``` + +## Configuration + +Access to your SolarWinds instance is defined using the [ExternalIntegration](https://docs.nautobot.com/projects/core/en/stable/user-guide/platform-functionality/externalintegration/) model which allows you to utilize this integration with multiple instances concurrently. Please bear in mind that it will synchronize all data 1:1 with the specified instance to match exactly, meaning it will delete data missing from an instance. Each ExternalIntegration must specify a SecretsGroup with [Secrets](https://docs.nautobot.com/projects/core/en/stable/user-guide/platform-functionality/secret/) that contain the SolarWinds administrator Username and Password to authenticate with. You can find Secrets and SecretsGroups available under the Secrets menu. + +Below is an example snippet from `nautobot_config.py` that demonstrates how to enable the SolarWinds integration: + +```python +PLUGINS_CONFIG = { + "nautobot_ssot": { + "enable_solarwinds": is_truthy(os.getenv("NAUTOBOT_SSOT_ENABLE_SOLARWINDS", "true")), + } +} +``` diff --git a/docs/images/solarwinds_dashboard.png b/docs/images/solarwinds_dashboard.png new file mode 100644 index 00000000..d2f34a55 Binary files /dev/null and b/docs/images/solarwinds_dashboard.png differ diff --git a/docs/images/solarwinds_detail-view.png b/docs/images/solarwinds_detail-view.png new file mode 100644 index 00000000..3dcc1abe Binary files /dev/null and b/docs/images/solarwinds_detail-view.png differ diff --git a/docs/images/solarwinds_enabled_job.png b/docs/images/solarwinds_enabled_job.png new file mode 100644 index 00000000..8f517452 Binary files /dev/null and b/docs/images/solarwinds_enabled_job.png differ diff --git a/docs/images/solarwinds_external_integration.png b/docs/images/solarwinds_external_integration.png new file mode 100644 index 00000000..417181bc Binary files /dev/null and b/docs/images/solarwinds_external_integration.png differ diff --git a/docs/images/solarwinds_job_form.png b/docs/images/solarwinds_job_form.png new file mode 100644 index 00000000..52d05991 Binary files /dev/null and b/docs/images/solarwinds_job_form.png differ diff --git a/docs/images/solarwinds_job_list.png b/docs/images/solarwinds_job_list.png new file mode 100644 index 00000000..1c4932d7 Binary files /dev/null and b/docs/images/solarwinds_job_list.png differ diff --git a/docs/images/solarwinds_job_settings.png b/docs/images/solarwinds_job_settings.png new file mode 100644 index 00000000..2bd6dd85 Binary files /dev/null and b/docs/images/solarwinds_job_settings.png differ diff --git a/docs/images/solarwinds_jobresult.png b/docs/images/solarwinds_jobresult.png new file mode 100644 index 00000000..8256e880 Binary files /dev/null and b/docs/images/solarwinds_jobresult.png differ diff --git a/docs/images/solarwinds_password_secret.png b/docs/images/solarwinds_password_secret.png new file mode 100644 index 00000000..46e77cb8 Binary files /dev/null and b/docs/images/solarwinds_password_secret.png differ diff --git a/docs/images/solarwinds_secretsgroup.png b/docs/images/solarwinds_secretsgroup.png new file mode 100644 index 00000000..e0d5b1cf Binary files /dev/null and b/docs/images/solarwinds_secretsgroup.png differ diff --git a/docs/images/solarwinds_username_secret.png b/docs/images/solarwinds_username_secret.png new file mode 100644 index 00000000..9cd9c837 Binary files /dev/null and b/docs/images/solarwinds_username_secret.png differ diff --git a/docs/user/integrations/index.md b/docs/user/integrations/index.md index 1d5f499d..92818ea0 100644 --- a/docs/user/integrations/index.md +++ b/docs/user/integrations/index.md @@ -14,3 +14,4 @@ This Nautobot app supports the following integrations: - [Cisco Meraki](./meraki.md) - [ServiceNow](./servicenow.md) - [Slurpit](./slurpit.md) +- [SolarWinds](./solarwinds.md) diff --git a/docs/user/integrations/solarwinds.md b/docs/user/integrations/solarwinds.md new file mode 100644 index 00000000..6ebb9215 --- /dev/null +++ b/docs/user/integrations/solarwinds.md @@ -0,0 +1,110 @@ +# SolarWinds SSoT Integration + +The SolarWinds integration is built as part of the [Nautobot Single Source of Truth (SSoT)](https://github.com/nautobot/nautobot-app-ssot) app. The SSoT app enables Nautobot to be the aggregation point for data coming from multiple systems of record (SoR). + +From SolarWinds into Nautobot, it synchronizes the following objects: + +| SolarWinds | Nautobot | +| ----------------------- | ---------------------------- | +| Container | Location* | +| Devices | Devices | +| Vendor | Manufacturers | +| Model/DeviceType | DeviceTypes | +| Model/Vendor | Platforms | +| Versions | SoftwareVersions | +| Interfaces | Interfaces | +| IP Addresses | IP Addresses | + +## Usage + +Once the app is installed and configured, you will be able to perform an inventory ingestion from SolarWinds Orion into Nautobot. From the Nautobot SSoT Dashboard view (`/plugins/ssot/`), or via Apps -> Single Source of Truth -> Dashboard, SolarWinds will show as a Data Source. + +![Dashboard View](../../images/solarwinds_dashboard.png) + +From the Dashboard, you can also view more information about the App by clicking on the `SolarWinds to Nautobot` link and see the Detail view. This view will show the mappings of SolarWinds objects to Nautobot objects, the sync history, and other configuration details for the App: + +![Detail View](../../images/solarwinds_detail-view.png) + +In order to utilize this integration you must first enable the Job. You can find the available installed Jobs under Jobs -> Jobs: + +![Job List](../../images/solarwinds_job_list.png) + +To enable the Job you must click on the orange pencil icon to the right of the `SolarWinds to Nautobot` Job. You will be presented with the settings for the Job as shown below: + +![Job Settings](../../images/solarwinds_job_settings.png) + +You'll need to check the `Enabled` checkbox and then the `Update` button at the bottom of the page. You will then see that the play button next to the Job changes to blue and becomes functional, linking to the Job form. + +![Enabled Job](../../images/solarwinds_enabled_job.png) + +Once the Job is enabled, you'll need to manually create a few objects in Nautobot to use with the Job. First, you'll need to create a Secret that contains your SolarWinds username and Password for authenticating to your desired SolarWinds instance: + +![Username Secret](../../images/solarwinds_username_secret.png) + +![Password Secret](../../images/solarwinds_password_secret.png) + +Once the required Secrets are created, you'll need to create a SecretsGroup that pairs them together and defines the Access Type of HTTP(S) like shown below: + +![SolarWinds SecretsGroup](../../images/solarwinds_secretsgroup.png) + +With the SecretsGroup defined containing your instance credentials you'll then need to create an ExternalIntegration object to store the information about the SolarWinds instance you wish to synchronize with. + +![SolarWinds ExternalIntegration](../../images/solarwinds_external_integration.png) + +> The only required portions are the Name, Remote URL, Verify SSL, HTTP Method (GET), and Secrets Group. +- The External Integration will need it's `http_method` set to `GET`. +- Keep the `verify_ssl` setting in mind, uncheck this if you are using untrusted certificates + +Extra settings can be configured in the Extra Config section of your External Integration, example below: + +| Setting | Default | Description | +| --------------- | ------- | --------------------------------------------------------------------------------- | +| port | 17774 | TCP port used for communication to the API | +| retries | 5 | How many retries before considering the connection to Solarwinds failed | +| batch_size | 100 | How many nodes to include in queries, this can be lowered to prevent API timeouts | + +```json +{ + "port": 443, + "retries": 10, + "batch_size": 100 +} +``` + +With those configured, you will then need to ensure you have the Locations and Location Types defined to be used for the imported Devices. With those created, you can run the Job to start the synchronization: + +![Job Form](../../images/solarwinds_job_form.png) + +If you wish to just test the synchronization but not have any data created in Nautobot you'll want to select the `Dryrun` toggle. Clicking the `Debug` toggle will enable more verbose logging to inform you of what is occuring behind the scenes. After those toggles there are also dropdowns that allow you to specify the SolarWinds instance to synchronize with and to define the LocationType to use for the imported Devices from SolarWinds. In addition, there are also some optional settings on the Job form: + +- You can choose to pull all devices from a specific SolarWinds Container (and subcontainers), or you can use a SolarWinds CustomProperty. This CustomProperty should be a Boolean set to `True`, and assigned to all devices you wish to sync. Enter the name of this CustomProperty into the CustomProperty field. +- If pulling from CustomProperty, you must choose the Location to place devices using the Location Override option, and should still choose the Container or ALL Containers. +- If the LocationType that you specify for the imported Devices requires a parent LocationType to be assigned, you must also select the Parent LocationType. + + +In addition, there are a few methods provided to assign Roles to your imported Devices. You can choose a Default Role to be used for all Devices not mapped via a method below. + +The Role Matching Attribute can be set to DeviceType or Hostname. You then provide a `role_map` to associate specific DeviceTypes or Hostnames to a Role name. This map should be a standard python dictionary if using DeviceType. Regex can be used to match Hostnames. Examples below: + +```python title="Role_Map using DeviceType" +{ + "C8300": "ROUTER_ROLE", + "C9200": "SWITCH_ROLE" +} +``` +```python title="Role_Map using Hostname and Regex" +{ + "CORE.*": "ROUTER_ROLE", + "WLC.*": "WLC_ROLE" +} +``` + +- Finally there is an option to specify a Tenant to be assigned to the imported Devices, Prefixes, and IPAddresses. This is handy for cases where you have multiple SolarWinds instances that are used by differing business units. +!!! info + Tenant names will also be used as Namespace names for any IP Addresses and Prefixes created, these Namespaces must be created by you! + +Running this Job will redirect you to a `Nautobot Job Result` view. + +![JobResult View](../../images/solarwinds_jobresult.png) + +Once the Job has finished you can click on the `SSoT Sync Details` button at the top right of the Job Result page to see detailed information about the data that was synchronized from SolarWinds and the outcome of the sync Job. There are also more logs to check if you run into issues, that can be found under Apps -> Logs. diff --git a/mkdocs.yml b/mkdocs.yml index 9394f231..570d5b8e 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -120,6 +120,7 @@ nav: - Cisco Meraki: "user/integrations/meraki.md" - ServiceNow: "user/integrations/servicenow.md" - Slurpit: "user/integrations/slurpit.md" + - SolarWinds: "user/integrations/solarwinds.md" - Modeling: "user/modeling.md" - Performance: "user/performance.md" - Frequently Asked Questions: "user/faq.md" @@ -139,6 +140,7 @@ nav: - Itential: "admin/integrations/itential_setup.md" - Cisco Meraki: "admin/integrations/meraki_setup.md" - ServiceNow: "admin/integrations/servicenow_setup.md" + - SolarWinds: "admin/integrations/solarwinds_setup.md" - Slurpit: "admin/integrations/slurpit_setup.md" - Upgrade: "admin/upgrade.md" - Uninstall: "admin/uninstall.md" diff --git a/nautobot_ssot/__init__.py b/nautobot_ssot/__init__.py index 268842ac..0ddc52c5 100644 --- a/nautobot_ssot/__init__.py +++ b/nautobot_ssot/__init__.py @@ -26,6 +26,7 @@ "nautobot_ssot_itential", "nautobot_ssot_meraki", "nautobot_ssot_servicenow", + "nautobot_ssot_solarwinds", ] @@ -107,8 +108,10 @@ class NautobotSSOTAppConfig(NautobotAppConfig): "enable_citrix_adm": False, "enable_infoblox": False, "enable_ipfabric": False, + "enable_meraki": False, "enable_servicenow": False, "enable_slurpit": False, + "enable_solarwinds": False, "enable_itential": False, "hide_example_jobs": True, "ipfabric_api_token": "", diff --git a/nautobot_ssot/integrations/solarwinds/__init__.py b/nautobot_ssot/integrations/solarwinds/__init__.py new file mode 100644 index 00000000..78352050 --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/__init__.py @@ -0,0 +1 @@ +"""Base module for Solarwinds integration.""" diff --git a/nautobot_ssot/integrations/solarwinds/constants.py b/nautobot_ssot/integrations/solarwinds/constants.py new file mode 100644 index 00000000..15a2f262 --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/constants.py @@ -0,0 +1,24 @@ +"""Constants to be used with Solarwinds SSoT.""" + +ETH_INTERFACE_NAME_MAP = { + "AppGigabitEthernet": "virtual", + "FastEthernet": "100base-tx", + "GigabitEthernet": "1000base-t", + "FiveGigabitEthernet": "5gbase-t", + "TenGigabitEthernet": "10gbase-t", + "TwentyFiveGigE": "25gbase-x-sfp28", + "FortyGigabitEthernet": "40gbase-x-qsfpp", + "FiftyGigabitEthernet": "50gbase-x-sfp28", + "HundredGigE": "100gbase-x-qsfp28", +} + +ETH_INTERFACE_SPEED_MAP = { + "100Mbps": "100base-tx", + "1Gbps": "1000base-t", + "5Gbps": "5gbase-t", + "10Gbps": "10gbase-t", + "25Gbps": "25gbase-x-sfp28", + "40Gbps": "40gbase-x-qsfpp", + "50Gbps": "50gbase-x-sfp28", + "100Gbps": "100gbase-x-qsfp28", +} diff --git a/nautobot_ssot/integrations/solarwinds/diffsync/__init__.py b/nautobot_ssot/integrations/solarwinds/diffsync/__init__.py new file mode 100644 index 00000000..4ab64220 --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/diffsync/__init__.py @@ -0,0 +1 @@ +"""DiffSync adapters and models for Solarwinds SSoT.""" diff --git a/nautobot_ssot/integrations/solarwinds/diffsync/adapters/__init__.py b/nautobot_ssot/integrations/solarwinds/diffsync/adapters/__init__.py new file mode 100644 index 00000000..c816eeb5 --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/diffsync/adapters/__init__.py @@ -0,0 +1 @@ +"""Adapter classes for loading DiffSyncModels with data from Solarwinds or Nautobot.""" diff --git a/nautobot_ssot/integrations/solarwinds/diffsync/adapters/nautobot.py b/nautobot_ssot/integrations/solarwinds/diffsync/adapters/nautobot.py new file mode 100644 index 00000000..52201783 --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/diffsync/adapters/nautobot.py @@ -0,0 +1,54 @@ +# pylint: disable=duplicate-code +"""Nautobot Adapter for Solarwinds SSoT app.""" + +from nautobot_ssot.contrib.adapter import NautobotAdapter as BaseNautobotAdapter +from nautobot_ssot.integrations.solarwinds.diffsync.models.base import ( + DeviceModel, + DeviceTypeModel, + InterfaceModel, + IPAddressModel, + LocationModel, + ManufacturerModel, + PlatformModel, + PrefixModel, + RoleModel, + SoftwareVersionModel, +) +from nautobot_ssot.integrations.solarwinds.diffsync.models.nautobot import ( + NautobotIPAddressToInterfaceModel, +) + + +class NautobotAdapter(BaseNautobotAdapter): + """DiffSync adapter for Nautobot.""" + + location = LocationModel + platform = PlatformModel + role = RoleModel + manufacturer = ManufacturerModel + device_type = DeviceTypeModel + softwareversion = SoftwareVersionModel + device = DeviceModel + interface = InterfaceModel + prefix = PrefixModel + ipaddress = IPAddressModel + ipassignment = NautobotIPAddressToInterfaceModel + + top_level = [ + "location", + "manufacturer", + "platform", + "role", + "softwareversion", + "device", + "prefix", + "ipaddress", + "ipassignment", + ] + + def load_param_mac_address(self, parameter_name, database_object): + """Custom loader for 'mac_address' parameter.""" + mac_addr = getattr(database_object, parameter_name) + if mac_addr is not None: + return str(mac_addr) + return mac_addr diff --git a/nautobot_ssot/integrations/solarwinds/diffsync/adapters/solarwinds.py b/nautobot_ssot/integrations/solarwinds/diffsync/adapters/solarwinds.py new file mode 100644 index 00000000..7335abc3 --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/diffsync/adapters/solarwinds.py @@ -0,0 +1,508 @@ +"""Nautobot SSoT Solarwinds Adapter for Solarwinds SSoT app.""" + +import json +from datetime import datetime +from typing import Dict, List, Optional + +from diffsync import Adapter, DiffSyncModel +from diffsync.enum import DiffSyncModelFlags +from netutils.ip import ipaddress_interface, is_ip_within +from netutils.mac import mac_to_format + +from nautobot_ssot.integrations.solarwinds.diffsync.models.solarwinds import ( + SolarwindsDevice, + SolarwindsDeviceType, + SolarwindsInterface, + SolarwindsIPAddress, + SolarwindsIPAddressToInterface, + SolarwindsLocation, + SolarwindsManufacturer, + SolarwindsPlatform, + SolarwindsPrefix, + SolarwindsRole, + SolarwindsSoftwareVersion, +) +from nautobot_ssot.integrations.solarwinds.utils.solarwinds import ( + SolarwindsClient, + determine_role_from_devicetype, + determine_role_from_hostname, +) + + +class SolarwindsAdapter(Adapter): # pylint: disable=too-many-instance-attributes + """DiffSync adapter for Solarwinds.""" + + location = SolarwindsLocation + platform = SolarwindsPlatform + role = SolarwindsRole + manufacturer = SolarwindsManufacturer + device_type = SolarwindsDeviceType + softwareversion = SolarwindsSoftwareVersion + device = SolarwindsDevice + interface = SolarwindsInterface + prefix = SolarwindsPrefix + ipaddress = SolarwindsIPAddress + ipassignment = SolarwindsIPAddressToInterface + + top_level = [ + "location", + "manufacturer", + "platform", + "role", + "softwareversion", + "device", + "prefix", + "ipaddress", + "ipassignment", + ] + + def __init__( # pylint: disable=too-many-arguments + self, + client: SolarwindsClient, + containers, + location_type, + job, + sync=None, + parent=None, + tenant=None, + ): + """Initialize Solarwinds. + + Args: + job (object, optional): Solarwinds job. Defaults to None. + sync (object, optional): SolarwindsDataSource Sync. Defaults to None. + client (SolarwindsClient): Solarwinds API client connection object. + containers (str): Concatenated string of Container names to be imported. Will be 'ALL' for all containers. + location_type (LocationType): The LocationType to create containers as in Nautobot. + parent (Location, optional): The parent Location to assign created containers to in Nautobot. + tenant (Tenant, optional): The Tenant to associate with Devices and IPAM data. + """ + super().__init__() + self.job = job + self.sync = sync + self.conn = client + self.containers = containers + self.location_type = location_type + self.parent = parent + self.tenant = tenant + self.failed_devices = [] + + def load(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements + """Load data from Solarwinds into DiffSync models.""" + self.job.logger.info("Loading data from Solarwinds.") + + if self.parent: + self.load_parent() + + if self.job.pull_from == "CustomProperty" and self.job.location_override: + container_nodes = self.get_nodes_custom_property( + custom_property=self.job.custom_property, location=self.job.location_override + ) + else: + container_nodes = self.get_container_nodes(custom_property=self.job.custom_property) + + self.load_sites(container_nodes) + + node_details = {} + for container_name, nodes in container_nodes.items(): # pylint: disable=too-many-nested-blocks + self.job.logger.debug(f"Retrieving node details from Solarwinds for {container_name}.") + node_details = self.conn.build_node_details(nodes=nodes) + for node in node_details.values(): + device_type = self.conn.standardize_device_type(node=node) + role = self.determine_device_role(node, device_type) + self.load_role(role) + if device_type: + platform_name = self.load_platform(device_type, manufacturer=node.get("Vendor")) + if platform_name == "UNKNOWN": + self.job.logger.error(f"Can't determine platform for {node['NodeHostname']} so skipping load.") + self.failed_devices.append({**node, **{"error": "Unable to determine Platform."}}) + continue + if node.get("Vendor") and node["Vendor"] != "net-snmp": + self.load_manufacturer_and_device_type(manufacturer=node["Vendor"], device_type=device_type) + version = self.conn.extract_version(version=node["Version"]) if node.get("Version") else "" + if version: + self.get_or_instantiate( + self.softwareversion, + ids={"version": version, "platform__name": platform_name, "status__name": "Active"}, + attrs={}, + ) + new_dev, loaded = self.get_or_instantiate( + self.device, + ids={ + "name": node["NodeHostname"], + }, + attrs={ + "device_type__manufacturer__name": node["Vendor"], + "device_type__model": device_type, + "location__name": container_name, + "location__location_type__name": self.location_type.name, + "platform__name": platform_name, + "role__name": role, + "snmp_location": node["SNMPLocation"] if node.get("SNMPLocation") else None, + "software_version__version": version if version else None, + "software_version__platform__name": platform_name if version else None, + "last_synced_from_sor": datetime.today().date().isoformat(), + "status__name": "Active", + "serial": node["ServiceTag"] if node.get("ServiceTag") else "", + "tenant__name": self.tenant.name if self.tenant else None, + "system_of_record": "Solarwinds", + }, + ) + if loaded: + if node.get("interfaces"): + self.load_interfaces(device=new_dev, intfs=node["interfaces"]) + if not node.get("ipaddrs") or ( + node.get("ipaddrs") and node["IPAddress"] not in node["ipaddrs"] + ): + prefix = ipaddress_interface( + ip=f"{node['IPAddress']}/{node['PFLength']}", attr="network" + ).with_prefixlen + self.load_prefix(network=prefix) + self.load_ipaddress( + addr=node["IPAddress"], + prefix_length=node["PFLength"], + prefix=prefix, + addr_type="IPv6" if ":" in node["IPAddress"] else "IPv4", + ) + self.load_interfaces( + device=new_dev, + intfs={1: {"Name": "Management", "Enabled": "Up", "Status": "Up"}}, + ) + self.load_ipassignment( + addr=node["IPAddress"], + dev_name=new_dev.name, + intf_name="Management", + addr_type="IPv6" if ":" in node["IPAddress"] else "IPv4", + mgmt_addr=node["IPAddress"], + ) + if node.get("ipaddrs"): + for _, ipaddr in node["ipaddrs"].items(): + pf_len = ipaddr["SubnetMask"] + prefix = ipaddress_interface( + f"{ipaddr['IPAddress']}/{pf_len}", "network" + ).with_prefixlen + self.load_prefix(network=prefix) + self.load_ipaddress( + addr=ipaddr["IPAddress"], + prefix_length=pf_len, + prefix=prefix, + addr_type=ipaddr["IPAddressType"], + ) + if ipaddr["IntfName"] not in node["interfaces"]: + self.load_interfaces( + device=new_dev, + intfs={1: {"Name": ipaddr["IntfName"], "Enabled": "Up", "Status": "Up"}}, + ) + self.load_ipassignment( + addr=ipaddr["IPAddress"], + dev_name=new_dev.name, + intf_name=ipaddr["IntfName"], + addr_type=ipaddr["IPAddressType"], + mgmt_addr=node["IPAddress"], + ) + else: + if node.get("Vendor") and node["Vendor"] == "net-snmp": + self.job.logger.error(f"{node['NodeHostname']} is showing as net-snmp so won't be imported.") + else: + self.job.logger.error(f"{node['NodeHostname']} is missing DeviceType so won't be imported.") + self.failed_devices.append({**node, **{"error": "Unable to determine DeviceType."}}) + + self.reprocess_ip_parent_prefixes() + if node_details and self.job.debug: + self.job.logger.debug(f"Node details: {json.dumps(node_details, indent=2)}") + if self.failed_devices: + self.job.logger.warning( + f"List of {len(self.failed_devices)} devices that were unable to be loaded. {json.dumps(self.failed_devices, indent=2)}" + ) + + def load_manufacturer_and_device_type(self, manufacturer: str, device_type: str): + """Load Manufacturer and DeviceType into DiffSync models. + + Args: + manufacturer (str): Name of manufacturer to be loaded. + device_type (str): DeviceType to be loaded. + """ + manu, _ = self.get_or_instantiate(self.manufacturer, ids={"name": manufacturer}, attrs={}) + new_dt, loaded = self.get_or_instantiate( + self.device_type, + ids={"model": device_type, "manufacturer__name": manufacturer}, + attrs={}, + ) + if loaded: + manu.add_child(new_dt) + + def get_nodes_custom_property(self, custom_property, location): + """Gather nodes with customproperty from Solarwinds.""" + nodes = {location.name: self.conn.get_nodes_custom_property(custom_property)} + return nodes + + def get_container_nodes(self, custom_property=None): + """Gather container nodes for all specified containers from Solarwinds.""" + container_ids, container_nodes = {}, {} + if self.containers != "ALL": + container_ids = self.conn.get_filtered_container_ids(containers=self.containers) + else: + container_ids = self.conn.get_top_level_containers(top_container=self.job.top_container) + container_nodes = self.conn.get_container_nodes(container_ids, custom_property) + return container_nodes + + def load_location( # pylint: disable=too-many-arguments + self, + loc_name: str, + location_type: str, + status: str, + parent_name: Optional[str] = None, + parent_type: Optional[str] = None, + parent_parent_name: Optional[str] = None, + parent_parent_type: Optional[str] = None, + ) -> tuple: + """Load location into DiffSync model. + + Args: + loc_name (str): Location name to load. + location_type (str): LocationType for Location to be loaded. + parent_name (str, optional): Name for parent of Location. Defaults to None. + parent_type (str, optional): LocationType for parent of Location. Defaults to None. + parent_parent_name (str, optional): Name for parent of parent of Location. Defaults to None. + parent_parent_type (str, optional): LocationType for parent of parent of Location. Defaults to None. + status (str): Status of Location to be loaded. + + Returns: + tuple: Location DiffSync model and if it was loaded. + """ + location, loaded = self.get_or_instantiate( + self.location, + ids={ + "name": loc_name, + "location_type__name": location_type, + "parent__name": parent_name, + "parent__location_type__name": parent_type, + "parent__parent__name": parent_parent_name, + "parent__parent__location_type__name": parent_parent_type, + }, + attrs={"status__name": status}, + ) + + return (location, loaded) + + def load_parent(self): + """Function to load parent Location into Location DiffSync model.""" + parent, loaded = self.load_location( + loc_name=self.parent.name, + location_type=self.parent.location_type.name, + status=self.parent.status.name, + parent_name=self.parent.parent.name if self.parent and self.parent.parent else None, + parent_type=self.parent.parent.location_type.name if self.parent and self.parent.parent else None, + parent_parent_name=( + self.parent.parent.parent.name + if self.parent and self.parent.parent and self.parent.parent.parent + else None + ), + parent_parent_type=( + self.parent.parent.parent.location_type.name + if self.parent and self.parent.parent and self.parent.parent.parent + else None + ), + ) + if loaded: + parent.model_flags = DiffSyncModelFlags.SKIP_UNMATCHED_DST + + def load_sites(self, container_nodes: Dict[str, List[dict]]): + """Load containers as LocationType into Location DiffSync models. + + Args: + container_nodes (Dict[str, List[dict]]): Dictionary of Container to list of dictionaries containing nodes within that container. + """ + for container_name, node_list in container_nodes.items(): + self.job.logger.debug(f"Found {len(node_list)} nodes for {container_name} container.") + self.load_location( + loc_name=container_name, + location_type=self.location_type.name, + parent_name=self.parent.name if self.parent else None, + parent_type=self.parent.location_type.name if self.parent else None, + parent_parent_name=self.parent.parent.name if self.parent and self.parent.parent else None, + parent_parent_type=( + self.parent.parent.location_type.name if self.parent and self.parent.parent else None + ), + status="Active", + ) + + def determine_device_role(self, node: dict, device_type: str) -> str: + """Determine Device Role based upon role_choice setting. + + Args: + node (dict): Dictionary of Node details. + device_type (str): DeviceType model. + + Returns: + str: Device Role from DeviceType, Hostname, or default Role. + """ + role = "" + if self.job.role_map and self.job.role_choice == "DeviceType": + role = determine_role_from_devicetype(device_type=device_type, role_map=self.job.role_map) + if self.job.role_map and self.job.role_choice == "Hostname": + role = determine_role_from_hostname(hostname=node["NodeHostname"], role_map=self.job.role_map) + if not role: + role = self.job.default_role.name + return role + + def load_role(self, role): + """Load passed Role into DiffSync model.""" + self.get_or_instantiate( + self.role, ids={"name": role}, attrs={"content_types": [{"app_label": "dcim", "model": "device"}]} + ) + + def load_platform(self, device_type: str, manufacturer: str): # pylint: disable=inconsistent-return-statements + """Load Platform into DiffSync model based upon DeviceType. + + Args: + device_type (str): DeviceType name for associated Platform. + manufacturer (str): Manufacturer name for associated Platform. + """ + if "Aruba" in manufacturer: + self.get_or_instantiate( + self.platform, + ids={"name": "arubanetworks.aoscx", "manufacturer__name": manufacturer}, + attrs={"network_driver": "aruba_aoscx", "napalm_driver": ""}, + ) + return "arubanetworks.aoscx" + if "Cisco" in manufacturer: + if not device_type.startswith("N"): + self.get_or_instantiate( + self.platform, + ids={"name": "cisco.ios.ios", "manufacturer__name": manufacturer}, + attrs={"network_driver": "cisco_ios", "napalm_driver": "ios"}, + ) + return "cisco.ios.ios" + if device_type.startswith("N"): + self.get_or_instantiate( + self.platform, + ids={"name": "cisco.nxos.nxos", "manufacturer__name": manufacturer}, + attrs={"network_driver": "cisco_nxos", "napalm_driver": "nxos"}, + ) + return "cisco.nxos.nxos" + elif "Palo" in manufacturer: + self.get_or_instantiate( + self.platform, + ids={"name": "paloaltonetworks.panos.panos", "manufacturer__name": manufacturer}, + attrs={"network_driver": "paloalto_panos", "napalm_driver": ""}, + ) + return "paloaltonetworks.panos.panos" + return "UNKNOWN" + + def load_interfaces(self, device: DiffSyncModel, intfs: dict) -> None: + """Load interfaces for passed device. + + Args: + device (DiffSyncModel): DiffSync Device model that's been loaded. + intfs (dict): Interface data for Device. + """ + for _, intf in intfs.items(): + new_intf, loaded = self.get_or_instantiate( + self.interface, + ids={"name": intf["Name"], "device__name": device.name}, + attrs={ + "enabled": bool(intf["Enabled"] == "Up"), + "mac_address": mac_to_format(intf["MAC"], "MAC_COLON_TWO") if intf.get("MAC") else None, + "mtu": intf["MTU"] if intf.get("MTU") else 1500, + "type": self.conn.determine_interface_type(interface=intf), + "status__name": "Active" if intf["Status"] == "Up" else "Failed", + }, + ) + if loaded: + device.add_child(new_intf) + + def reprocess_ip_parent_prefixes(self) -> None: + """Check for an existing more specific prefix. + + Runs after loading all data to ensure IP's have appropriate parent prefixes. + """ + for ipaddr in self.get_all(obj="ipaddress"): + parent_subnet = f"{ipaddr.parent__network}/{ipaddr.parent__prefix_length}" + for prefix in self.get_all(obj="prefix"): + if not prefix.namespace__name == ipaddr.parent__namespace__name: + continue + subnet = f"{prefix.network}/{prefix.prefix_length}" + if not is_ip_within(parent_subnet, subnet): + if is_ip_within(ipaddr.host, subnet): + if self.job.debug: + self.job.logger.debug( + "More specific subnet %s found for IP %s/%s", subnet, ipaddr.host, ipaddr.mask_length + ) + ipaddr.parent__network = prefix.network + ipaddr.parent__prefix_length = prefix.prefix_length + self.update(ipaddr) + + def load_prefix(self, network: str) -> None: + """Load Prefix for passed network. + + Args: + network (str): Prefix network to be loaded. + """ + self.get_or_instantiate( + self.prefix, + ids={ + "network": network.split("/")[0], + "prefix_length": network.split("/")[1], + "namespace__name": self.tenant.name if self.tenant else "Global", + }, + attrs={ + "status__name": "Active", + "tenant__name": self.tenant.name if self.tenant else None, + "last_synced_from_sor": datetime.today().date().isoformat(), + "system_of_record": "Solarwinds", + }, + ) + + def load_ipaddress(self, addr: str, prefix_length: int, prefix: str, addr_type: str) -> None: + """Load IPAddress for passed address. + + Args: + addr (str): Host for IPAddress. + prefix_length (int): Prefix length for IPAddress. + prefix (str): Parent prefix CIDR for IPAddress. + addr_type (str): Either "IPv4" or "IPv6" + """ + self.get_or_instantiate( + self.ipaddress, + ids={ + "host": addr, + "parent__network": prefix.split("/")[0], + "parent__prefix_length": prefix_length, + "parent__namespace__name": self.tenant.name if self.tenant else "Global", + }, + attrs={ + "mask_length": prefix_length, + "status__name": "Active", + "ip_version": 4 if addr_type == "IPv4" else 6, + "tenant__name": self.tenant.name if self.tenant else None, + "last_synced_from_sor": datetime.today().date().isoformat(), + "system_of_record": "Solarwinds", + }, + ) + + def load_ipassignment( # pylint: disable=too-many-arguments + self, + addr: str, + dev_name: str, + intf_name: str, + addr_type: str, + mgmt_addr: str, + ) -> None: + """Load IPAddress for passed address. + + Args: + addr (str): Host for IPAddress. + dev_name (str): Device name for associated Interface. + intf_name (str): Interface name to associate IPAddress to. + addr_type (str): Either "IPv4" or "IPv6" + mgmt_addr (str): Management IP Address for Device. + """ + self.get_or_instantiate( + self.ipassignment, + ids={"interface__device__name": dev_name, "interface__name": intf_name, "ip_address__host": addr}, + attrs={ + "interface__device__primary_ip4__host": mgmt_addr if addr_type == "IPv4" else None, + "interface__device__primary_ip6__host": mgmt_addr if addr_type == "IPv6" else None, + }, + ) diff --git a/nautobot_ssot/integrations/solarwinds/diffsync/models/__init__.py b/nautobot_ssot/integrations/solarwinds/diffsync/models/__init__.py new file mode 100644 index 00000000..d768b5ad --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/diffsync/models/__init__.py @@ -0,0 +1 @@ +"""DiffSync models and adapters for the Solarwinds SSoT app.""" diff --git a/nautobot_ssot/integrations/solarwinds/diffsync/models/base.py b/nautobot_ssot/integrations/solarwinds/diffsync/models/base.py new file mode 100644 index 00000000..d35a0737 --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/diffsync/models/base.py @@ -0,0 +1,290 @@ +# pylint: disable=R0801 +"""DiffSyncModel subclasses for Nautobot-to-Solarwinds data sync.""" + +try: + from typing import Annotated # Python>=3.9 +except ImportError: + from typing_extensions import Annotated # Python<3.9 + +from typing import List, Optional + +from diffsync.enum import DiffSyncModelFlags +from nautobot.dcim.models import Device, DeviceType, Interface, Location, Manufacturer, Platform, SoftwareVersion +from nautobot.extras.models import Role +from nautobot.ipam.models import IPAddress, IPAddressToInterface, Prefix + +from nautobot_ssot.contrib.model import NautobotModel +from nautobot_ssot.contrib.types import CustomFieldAnnotation +from nautobot_ssot.tests.contrib_base_classes import ContentTypeDict + + +class LocationModel(NautobotModel): + """Diffsync model for Solarwinds containers.""" + + model_flags: DiffSyncModelFlags = DiffSyncModelFlags.SKIP_UNMATCHED_DST + + _model = Location + _modelname = "location" + _identifiers = ( + "name", + "location_type__name", + "parent__name", + "parent__location_type__name", + "parent__parent__name", + "parent__parent__location_type__name", + ) + _attributes = ("status__name",) + _children = {} + + name: str + location_type__name: str + status__name: str + parent__name: Optional[str] = None + parent__location_type__name: Optional[str] = None + parent__parent__name: Optional[str] = None + parent__parent__location_type__name: Optional[str] = None + + +class DeviceTypeModel(NautobotModel): + """DiffSync model for Solarwinds device types.""" + + model_flags: DiffSyncModelFlags = DiffSyncModelFlags.SKIP_UNMATCHED_DST + + _model = DeviceType + _modelname = "device_type" + _identifiers = ("model", "manufacturer__name") + + model: str + manufacturer__name: str + + +class ManufacturerModel(NautobotModel): + """DiffSync model for Solarwinds device manufacturers.""" + + model_flags: DiffSyncModelFlags = DiffSyncModelFlags.SKIP_UNMATCHED_DST + + _model = Manufacturer + _modelname = "manufacturer" + _identifiers = ("name",) + _children = {"device_type": "device_types"} + + name: str + device_types: List[DeviceTypeModel] = [] + + +class PlatformModel(NautobotModel): + """Shared data model representing a Platform in either of the local or remote Nautobot instances.""" + + model_flags: DiffSyncModelFlags = DiffSyncModelFlags.SKIP_UNMATCHED_DST + + _model = Platform + _modelname = "platform" + _identifiers = ("name", "manufacturer__name") + _attributes = ("network_driver", "napalm_driver") + + name: str + manufacturer__name: str + network_driver: str + napalm_driver: str + + +class RoleModel(NautobotModel): + """DiffSync model for Solarwinds Device roles.""" + + model_flags: DiffSyncModelFlags = DiffSyncModelFlags.SKIP_UNMATCHED_DST + + _model = Role + _modelname = "role" + _identifiers = ("name",) + _attributes = ("content_types",) + + name: str + content_types: List[ContentTypeDict] = [] + + +class SoftwareVersionModel(NautobotModel): + """DiffSync model for Solarwinds Device Software versions.""" + + model_flags: DiffSyncModelFlags = DiffSyncModelFlags.SKIP_UNMATCHED_DST + + _model = SoftwareVersion + _modelname = "softwareversion" + _identifiers = ("version", "platform__name") + _attributes = ("status__name",) + + version: str + platform__name: str + status__name: str + + +class DeviceModel(NautobotModel): + """DiffSync model for Solarwinds devices.""" + + _model = Device + _modelname = "device" + _identifiers = ("name",) + _attributes = ( + "status__name", + "device_type__manufacturer__name", + "device_type__model", + "location__name", + "location__location_type__name", + "platform__name", + "role__name", + "serial", + "snmp_location", + "software_version__version", + "software_version__platform__name", + "last_synced_from_sor", + "system_of_record", + "tenant__name", + ) + _children = {"interface": "interfaces"} + + name: str + device_type__manufacturer__name: str + device_type__model: str + location__name: str + location__location_type__name: str + platform__name: str + role__name: str + serial: str + software_version__version: Optional[str] = None + software_version__platform__name: Optional[str] = None + status__name: str + tenant__name: Optional[str] = None + + interfaces: Optional[List["InterfaceModel"]] = [] + + snmp_location: Annotated[Optional[str], CustomFieldAnnotation(name="snmp_location")] = None + system_of_record: Annotated[Optional[str], CustomFieldAnnotation(name="system_of_record")] = None + last_synced_from_sor: Annotated[Optional[str], CustomFieldAnnotation(name="last_synced_from_sor")] = None + + @classmethod + def get_queryset(cls): + """Return only Devices with system_of_record set to Solarwinds.""" + return Device.objects.filter(_custom_field_data__system_of_record="Solarwinds") + + +class InterfaceModel(NautobotModel): + """Shared data model representing an Interface.""" + + # Metadata about this model + _model = Interface + _modelname = "interface" + _identifiers = ("name", "device__name") + _attributes = ( + "enabled", + "mac_address", + "mtu", + "type", + "status__name", + ) + _children = {} + + name: str + device__name: str + enabled: bool + mac_address: Optional[str] = None + mtu: int + type: str + status__name: str + + @classmethod + def get_queryset(cls): + """Return only Interfaces with system_of_record set to Solarwinds.""" + return Interface.objects.filter(device___custom_field_data__system_of_record="Solarwinds") + + +class PrefixModel(NautobotModel): + """Shared data model representing a Prefix.""" + + # Metadata about this model + _model = Prefix + _modelname = "prefix" + _identifiers = ( + "network", + "prefix_length", + "namespace__name", + ) + _attributes = ( + "status__name", + "tenant__name", + "last_synced_from_sor", + "system_of_record", + ) + + # Data type declarations for all identifiers and attributes + network: str + prefix_length: int + status__name: str + tenant__name: Optional[str] = None + namespace__name: str + system_of_record: Annotated[Optional[str], CustomFieldAnnotation(name="system_of_record")] = None + last_synced_from_sor: Annotated[Optional[str], CustomFieldAnnotation(name="last_synced_from_sor")] = None + + @classmethod + def get_queryset(cls): + """Return only Prefixes with system_of_record set to Solarwinds.""" + return Prefix.objects.filter(_custom_field_data__system_of_record="Solarwinds") + + +class IPAddressModel(NautobotModel): + """Shared data model representing an IPAddress.""" + + _model = IPAddress + _modelname = "ipaddress" + _identifiers = ( + "host", + "parent__network", + "parent__prefix_length", + "parent__namespace__name", + ) + _attributes = ( + "mask_length", + "status__name", + "ip_version", + "tenant__name", + "last_synced_from_sor", + "system_of_record", + ) + + host: str + mask_length: int + parent__network: str + parent__prefix_length: int + parent__namespace__name: str + status__name: str + ip_version: int + tenant__name: Optional[str] = None + system_of_record: Annotated[Optional[str], CustomFieldAnnotation(name="system_of_record")] = None + last_synced_from_sor: Annotated[Optional[str], CustomFieldAnnotation(name="last_synced_from_sor")] = None + + @classmethod + def get_queryset(cls): + """Return only IP Addresses with system_of_record set to Solarwinds.""" + return IPAddress.objects.filter(_custom_field_data__system_of_record="Solarwinds") + + +class IPAddressToInterfaceModel(NautobotModel): + """Shared data model representing an IPAddressToInterface.""" + + _model = IPAddressToInterface + _modelname = "ipassignment" + _identifiers = ("interface__device__name", "interface__name", "ip_address__host") + _attributes = ( + "interface__device__primary_ip4__host", + "interface__device__primary_ip6__host", + ) + _children = {} + + interface__device__name: str + interface__name: str + ip_address__host: str + interface__device__primary_ip4__host: Optional[str] = None + interface__device__primary_ip6__host: Optional[str] = None + + @classmethod + def get_queryset(cls): + """Return only IPAddressToInterface with system_of_record set to Solarwinds.""" + return IPAddressToInterface.objects.filter(interface__device___custom_field_data__system_of_record="Solarwinds") diff --git a/nautobot_ssot/integrations/solarwinds/diffsync/models/nautobot.py b/nautobot_ssot/integrations/solarwinds/diffsync/models/nautobot.py new file mode 100644 index 00000000..e0ef2457 --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/diffsync/models/nautobot.py @@ -0,0 +1,72 @@ +# pylint: disable=no-member +"""Nautobot DiffSync models for Solarwinds SSoT.""" + +from nautobot.dcim.models import Interface +from nautobot.ipam.models import IPAddress, IPAddressToInterface + +from nautobot_ssot.integrations.solarwinds.diffsync.models.base import IPAddressToInterfaceModel + + +class NautobotIPAddressToInterfaceModel(IPAddressToInterfaceModel): + """IPAddressToInterface model for Nautobot.""" + + @classmethod + def create(cls, adapter, ids, attrs): + """Create IPAddressToInterface in Nautobot.""" + if adapter.job.debug: + adapter.job.logger.debug(f"Creating IPAddressToInterface {ids} {attrs}") + intf = Interface.objects.get(name=ids["interface__name"], device__name=ids["interface__device__name"]) + + # try: + obj = IPAddressToInterface( + ip_address=IPAddress.objects.get(host=ids["ip_address__host"], tenant=intf.device.tenant), + interface=intf, + ) + obj.validated_save() + # except IPAddress.DoesNotExist as e: + # print(f"IP: {ids=}, {intf=}, {intf.device=}") + + if ( + attrs.get("interface__device__primary_ip4__host") + and ids["ip_address__host"] == attrs["interface__device__primary_ip4__host"] + ): + obj.interface.device.primary_ip4 = IPAddress.objects.get( + host=attrs["interface__device__primary_ip4__host"], + tenant=obj.interface.device.tenant, + ) + obj.interface.device.validated_save() + if ( + attrs.get("interface__device__primary_ip6__host") + and ids["ip_address__host"] == attrs["interface__device__primary_ip6__host"] + ): + obj.interface.device.primary_ip6 = IPAddress.objects.get( + host=attrs["interface__device__primary_ip6__host"], + tenant=obj.interface.device.tenant, + ) + obj.interface.device.validated_save() + return super().create_base(adapter=adapter, ids=ids, attrs=attrs) + + def update(self, attrs): + """Update IPAddressToInterface in Nautobot.""" + obj = self.get_from_db() + if ( + attrs.get("interface__device__primary_ip4__host") + and self.ip_address__host == attrs["interface__device__primary_ip4__host"] + ): + obj.interface.device.primary_ip4 = IPAddress.objects.get( + host=attrs["interface__device__primary_ip4__host"], tenant=obj.interface.device.tenant + ) + obj.interface.device.validated_save() + if ( + attrs.get("interface__device__primary_ip6__host") + and self.ip_address__host == attrs["interface__device__primary_ip6__host"] + ): + obj.interface.device.primary_ip6 = IPAddress.objects.get( + host=attrs["interface__device__primary_ip6__host"], tenant=obj.interface.device.tenant + ) + obj.interface.device.validated_save() + return super().update_base(attrs) + + def delete(self): + """Delete IPAddressToInterface in Nautobot.""" + return super().delete_base() diff --git a/nautobot_ssot/integrations/solarwinds/diffsync/models/solarwinds.py b/nautobot_ssot/integrations/solarwinds/diffsync/models/solarwinds.py new file mode 100644 index 00000000..0762df13 --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/diffsync/models/solarwinds.py @@ -0,0 +1,202 @@ +"""Nautobot SSoT Solarwinds DiffSync models for Nautobot SSoT Solarwinds SSoT.""" + +from nautobot_ssot.integrations.solarwinds.diffsync.models.base import ( + DeviceModel, + DeviceTypeModel, + InterfaceModel, + IPAddressModel, + IPAddressToInterfaceModel, + LocationModel, + ManufacturerModel, + PlatformModel, + PrefixModel, + RoleModel, + SoftwareVersionModel, +) + + +class SolarwindsLocation(LocationModel): + """Solarwinds implementation of Location DiffSync model.""" + + @classmethod + def create(cls, adapter, ids, attrs): + """Create Location in Solarwinds from SolarwindsLocation object.""" + raise NotImplementedError + + def update(self, attrs): + """Update Location in Solarwinds from SolarwindsLocation object.""" + raise NotImplementedError + + def delete(self): + """Delete Location in Solarwinds from SolarwindsLocation object.""" + raise NotImplementedError + + +class SolarwindsDeviceType(DeviceTypeModel): + """Solarwinds implementation of DeviceType DiffSync model.""" + + @classmethod + def create(cls, adapter, ids, attrs): + """Create DeviceType in Solarwinds from SolarwindsDeviceType object.""" + raise NotImplementedError + + def update(self, attrs): + """Update DeviceType in Solarwinds from SolarwindsDeviceType object.""" + raise NotImplementedError + + def delete(self): + """Delete DeviceType in Solarwinds from SolarwindsDeviceType object.""" + raise NotImplementedError + + +class SolarwindsManufacturer(ManufacturerModel): + """Solarwinds implementation of Manufacturer DiffSync model.""" + + @classmethod + def create(cls, adapter, ids, attrs): + """Create Manufacturer in Solarwinds from SolarwindsManufacturer object.""" + raise NotImplementedError + + def update(self, attrs): + """Update Manufacturer in Solarwinds from SolarwindsManufacturer object.""" + raise NotImplementedError + + def delete(self): + """Delete Manufacturer in Solarwinds from SolarwindsManufacturer object.""" + raise NotImplementedError + + +class SolarwindsPlatform(PlatformModel): + """Solarwinds implementation of Platform DiffSync model.""" + + @classmethod + def create(cls, adapter, ids, attrs): + """Create Platform in Solarwinds from SolarwindsPlatform object.""" + raise NotImplementedError + + def update(self, attrs): + """Update Platform in Solarwinds from SolarwindsPlatform object.""" + raise NotImplementedError + + def delete(self): + """Delete Platform in Solarwinds from SolarwindsPlatform object.""" + raise NotImplementedError + + +class SolarwindsSoftwareVersion(SoftwareVersionModel): + """Solarwinds implementation of SoftwareVersion DiffSync model.""" + + @classmethod + def create(cls, adapter, ids, attrs): + """Create SoftwareVersion in Solarwinds from SolarwindsSoftwareVersion object.""" + raise NotImplementedError + + def update(self, attrs): + """Update SoftwareVersion in Solarwinds from SolarwindsSoftwareVersion object.""" + raise NotImplementedError + + def delete(self): + """Delete SoftwareVersion in Solarwinds from SolarwindsSoftwareVersion object.""" + raise NotImplementedError + + +class SolarwindsRole(RoleModel): + """Solarwinds implementation of Role DiffSync model.""" + + @classmethod + def create(cls, adapter, ids, attrs): + """Create Role in Solarwinds from SolarwindsRole object.""" + raise NotImplementedError + + def update(self, attrs): + """Update Role in Solarwinds from SolarwindsRole object.""" + raise NotImplementedError + + def delete(self): + """Delete Role in Solarwinds from SolarwindsRole object.""" + raise NotImplementedError + + +class SolarwindsDevice(DeviceModel): + """Solarwinds implementation of Device DiffSync model.""" + + @classmethod + def create(cls, adapter, ids, attrs): + """Create Device in Solarwinds from SolarwindsDevice object.""" + raise NotImplementedError + + def update(self, attrs): + """Update Device in Solarwinds from SolarwindsDevice object.""" + raise NotImplementedError + + def delete(self): + """Delete Device in Solarwinds from SolarwindsDevice object.""" + raise NotImplementedError + + +class SolarwindsInterface(InterfaceModel): + """Solarwinds implementation of Interface DiffSync model.""" + + @classmethod + def create(cls, adapter, ids, attrs): + """Create Interface in Solarwinds from SolarwindsInterface object.""" + raise NotImplementedError + + def update(self, attrs): + """Update Interface in Solarwinds from SolarwindsInterface object.""" + raise NotImplementedError + + def delete(self): + """Delete Interface in Solarwinds from SolarwindsInterface object.""" + raise NotImplementedError + + +class SolarwindsPrefix(PrefixModel): + """Solarwinds implementation of Prefix DiffSync model.""" + + @classmethod + def create(cls, adapter, ids, attrs): + """Create Prefix in Solarwinds from SolarwindsPrefix object.""" + raise NotImplementedError + + def update(self, attrs): + """Update Prefix in Solarwinds from SolarwindsPrefix object.""" + raise NotImplementedError + + def delete(self): + """Delete Prefix in Solarwinds from SolarwindsPrefix object.""" + raise NotImplementedError + + +class SolarwindsIPAddress(IPAddressModel): + """Solarwinds implementation of IPAddress DiffSync model.""" + + @classmethod + def create(cls, adapter, ids, attrs): + """Create IPAddress in Solarwinds from SolarwindsIPAddress object.""" + raise NotImplementedError + + def update(self, attrs): + """Update IPAddress in Solarwinds from SolarwindsIPAddress object.""" + raise NotImplementedError + + def delete(self): + """Delete IPAddress in Solarwinds from SolarwindsIPAddress object.""" + raise NotImplementedError + + +class SolarwindsIPAddressToInterface(IPAddressToInterfaceModel): + """Solarwinds implementation of IPAddressToInterface DiffSync model.""" + + @classmethod + def create(cls, adapter, ids, attrs): + """Create IPAddressToInterface in Solarwinds from SolarwindsIPAddressToInterface object.""" + raise NotImplementedError + + def update(self, attrs): + """Update IPAddressToInterface in Solarwinds from SolarwindsIPAddressToInterface object.""" + raise NotImplementedError + + def delete(self): + """Delete IPAddressToInterface in Solarwinds from SolarwindsIPAddressToInterface object.""" + raise NotImplementedError diff --git a/nautobot_ssot/integrations/solarwinds/jobs.py b/nautobot_ssot/integrations/solarwinds/jobs.py new file mode 100644 index 00000000..8195beb6 --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/jobs.py @@ -0,0 +1,286 @@ +# pylint: disable=R0801 +"""Jobs for Solarwinds SSoT integration.""" + +from diffsync.enum import DiffSyncFlags +from django.urls import reverse +from nautobot.apps.jobs import BooleanVar, ChoiceVar, JSONVar, ObjectVar, StringVar, TextVar, register_jobs +from nautobot.dcim.models import Device, Location, LocationType +from nautobot.extras.choices import SecretsGroupAccessTypeChoices, SecretsGroupSecretTypeChoices +from nautobot.extras.models import ExternalIntegration, Role +from nautobot.tenancy.models import Tenant + +from nautobot_ssot.integrations.solarwinds.diffsync.adapters import nautobot, solarwinds +from nautobot_ssot.integrations.solarwinds.utils.solarwinds import SolarwindsClient +from nautobot_ssot.jobs.base import DataMapping, DataSource + +name = "Solarwinds SSoT" # pylint: disable=invalid-name + + +ROLE_CHOICES = (("DeviceType", "DeviceType"), ("Hostname", "Hostname")) +PULL_FROM_CHOICES = (("Containers", "Containers"), ("CustomProperty", "Custom Property")) + + +class JobConfigError(Exception): + """Custom Exception for misconfigured Job form.""" + + +class SolarwindsDataSource(DataSource): # pylint: disable=too-many-instance-attributes + """Solarwinds SSoT Data Source.""" + + integration = ObjectVar( + model=ExternalIntegration, + queryset=ExternalIntegration.objects.all(), + display_field="display", + label="Solarwinds Instance", + required=True, + ) + pull_from = ChoiceVar( + choices=PULL_FROM_CHOICES, + label="Pull Devices From:", + description="Specify whether to pull all devices from SolarWinds containers, or use a Custom Property", + required=True, + ) + custom_property = StringVar( + description="Name of SolarWinds Custom Property existing (set to True) on Devices to be synced.", + label="SolarWinds Custom Property", + required=False, + ) + location_override = ObjectVar( + model=Location, + queryset=Location.objects.all(), + description="Override using Container names for Location, all devices synced will be placed here.", + label="Location Override", + required=False, + ) + containers = TextVar( + default="ALL", + description="Comma separated list of Containers to be Imported. Use 'ALL' to import every container from Solarwinds. Must specify Top Container if `ALL` is specified, unless using CustomProperty.", + label="Container(s)", + required=True, + ) + top_container = TextVar( + default="", + description="Top-level Container if `ALL` containers are to be imported.", + label="Top Container", + required=False, + ) + location_type = ObjectVar( + model=LocationType, + queryset=LocationType.objects.all(), + description="LocationType to define Container(s) as. Must support Device ContentType.", + label="Location Type", + required=False, + ) + parent = ObjectVar( + model=Location, + queryset=Location.objects.all(), + description="Parent Location to assign created Containers to if specified LocationType requires parent be defined.", + label="Parent Location", + required=False, + ) + tenant = ObjectVar( + model=Tenant, + queryset=Tenant.objects.all(), + description="Tenant to assign to imported Devices.", + label="Tenant", + required=False, + ) + role_map = JSONVar( + label="Device Roles Map", description="Mapping of matching object to Role.", default={}, required=False + ) + role_choice = ChoiceVar( + choices=ROLE_CHOICES, + label="Role Map Matching Attribute", + description="Specify which Device attribute to match for Role Map.", + ) + default_role = ObjectVar( + label="Default Device Role", + model=Role, + queryset=Role.objects.all(), + query_params={"content_types": Device._meta.label_lower}, + display_field="name", + required=True, + ) + debug = BooleanVar(description="Enable for more verbose debug logging", default=False) + + def __init__(self): + """Initialize job objects.""" + super().__init__() + self.data = None + self.diffsync_flags = DiffSyncFlags.CONTINUE_ON_FAILURE + + class Meta: # pylint: disable=too-few-public-methods + """Meta data for Solarwinds.""" + + name = "Solarwinds to Nautobot" + data_source = "Solarwinds" + data_target = "Nautobot" + description = "Sync information from Solarwinds to Nautobot" + has_sensitive_variables = False + field_order = [ + "dryrun", + "debug", + "integration", + "location_type", + "pull_from", + "custom_property", + "containers", + "top_container", + "location_override", + "parent", + "tenant", + "default_role", + "role_choice", + "role_map", + ] + + @classmethod + def config_information(cls): + """Dictionary describing the configuration of this DataSource.""" + return {} + + @classmethod + def data_mappings(cls): + """List describing the data mappings involved in this DataSource.""" + return ( + DataMapping("Containers", None, "Locations", reverse("dcim:location_list")), + DataMapping("Devices", None, "Devices", reverse("dcim:device_list")), + DataMapping("Interfaces", None, "Interfaces", reverse("dcim:interface_list")), + DataMapping("Prefixes", None, "Prefixes", reverse("ipam:prefix_list")), + DataMapping("IP Addresses", None, "IP Addresses", reverse("ipam:ipaddress_list")), + DataMapping("Vendor", None, "Manufacturers", reverse("dcim:manufacturer_list")), + DataMapping("Model/DeviceType", None, "DeviceTypes", reverse("dcim:devicetype_list")), + DataMapping("Model/Vendor", None, "Platforms", reverse("dcim:platform_list")), + DataMapping("OS Version", None, "SoftwareVersions", reverse("dcim:softwareversion_list")), + ) + + def validate_containers(self): + """Confirm Job form variable for containers.""" + if self.containers == "": + self.logger.error("Containers variable must be defined with container name(s) or 'ALL'.") + raise JobConfigError + if self.pull_from == "Containers" and self.containers == "ALL" and self.top_container == "": + self.logger.error("Top Container must be specified if `ALL` Containers are to be imported.") + raise JobConfigError + + def validate_location_configuration(self): + """Confirm that LocationType or Location Override are set properly.""" + if not self.location_type: + if not self.location_override: + self.logger.error("A Location Type must be specified, unless using Location Override.") + raise JobConfigError + return + + if self.location_type.parent is not None and self.parent is None: + self.logger.error("LocationType %s requires Parent Location be specified.", self.location_type) + raise JobConfigError + if self.location_type.parent is None and self.parent: + self.logger.error( + "LocationType %s does not require a Parent location, but a Parent location was chosen.", + self.location_type, + ) + raise JobConfigError + + if ("dcim", "device") not in self.location_type.content_types.values_list("app_label", "model"): + self.logger.error( + "Specified LocationType %s is missing Device ContentType. Please change LocationType or add Device ContentType to %s LocationType and re-run Job.", + self.location_type, + self.location_type, + ) + raise JobConfigError + + def validate_role_map(self): + """Confirm configuration of Role Map Job var.""" + if self.role_map and not self.role_choice: + self.logger.error("Role Map Matching Attribute must be defined if Role Map is specified.") + raise JobConfigError + + def validate_custom_property(self): + """Confirm configuration of Custom Property var.""" + if self.pull_from == "CustomProperty" and not self.custom_property: + self.logger.error("Custom Property value must exist if pulling from Custom Property.") + raise JobConfigError + if self.pull_from == "CustomProperty" and not self.location_override: + self.logger.error("Location Override must be selected if pulling from CustomProperty.") + raise JobConfigError + + def load_source_adapter(self): + """Load data from Solarwinds into DiffSync models.""" + self.validate_containers() + self.validate_location_configuration() + self.validate_custom_property() + _sg = self.integration.secrets_group + username = _sg.get_secret_value( + access_type=SecretsGroupAccessTypeChoices.TYPE_HTTP, + secret_type=SecretsGroupSecretTypeChoices.TYPE_USERNAME, + ) + password = _sg.get_secret_value( + access_type=SecretsGroupAccessTypeChoices.TYPE_HTTP, + secret_type=SecretsGroupSecretTypeChoices.TYPE_PASSWORD, + ) + port = self.integration.extra_config.get("port") if self.integration.extra_config else None + retries = self.integration.extra_config.get("retries") if self.integration.extra_config else None + client = SolarwindsClient( + hostname=self.integration.remote_url, + username=username, + password=password, + port=port if port else 17774, + retries=retries if retries else 5, + timeout=self.integration.timeout, + verify=self.integration.verify_ssl, + job=self, + ) + self.source_adapter = solarwinds.SolarwindsAdapter( + job=self, + sync=self.sync, + client=client, + containers=self.containers, + location_type=self.location_type, + parent=self.parent, + tenant=self.tenant, + ) + self.source_adapter.load() + + def load_target_adapter(self): + """Load data from Nautobot into DiffSync models.""" + self.target_adapter = nautobot.NautobotAdapter(job=self, sync=self.sync) + self.target_adapter.load() + + def run( # pylint: disable=arguments-differ, too-many-arguments + self, + integration, + containers, + top_container, + dryrun, + location_type, + parent, + tenant, + role_map, + role_choice, + default_role, + memory_profiling, + debug, + *args, + **kwargs, + ): + """Perform data synchronization.""" + self.integration = integration + self.pull_from = kwargs["pull_from"] + self.custom_property = kwargs["custom_property"] + self.location_override = kwargs["location_override"] + self.containers = containers + self.top_container = top_container + self.location_type = location_type if location_type else self.location_override.location_type + self.parent = parent + self.tenant = tenant + self.role_map = role_map + self.role_choice = role_choice + self.default_role = default_role + self.debug = debug + self.dryrun = dryrun + self.memory_profiling = memory_profiling + super().run(dryrun=self.dryrun, memory_profiling=self.memory_profiling, *args, **kwargs) + + +jobs = [SolarwindsDataSource] +register_jobs(*jobs) diff --git a/nautobot_ssot/integrations/solarwinds/signals.py b/nautobot_ssot/integrations/solarwinds/signals.py new file mode 100644 index 00000000..9b91187c --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/signals.py @@ -0,0 +1,46 @@ +# pylint: disable=R0801 +"""Signals triggered when Nautobot starts to perform certain actions.""" + +from nautobot.core.signals import nautobot_database_ready +from nautobot.extras.choices import CustomFieldTypeChoices + + +def register_signals(sender): + """Register signals for Solarwinds integration.""" + nautobot_database_ready.connect(nautobot_database_ready_callback, sender=sender) + + +def nautobot_database_ready_callback(sender, *, apps, **kwargs): # pylint: disable=unused-argument + """Adds OS Version and Physical Address CustomField to Devices and System of Record and Last Sync'd to Device, and IPAddress. + + Callback function triggered by the nautobot_database_ready signal when the Nautobot database is fully ready. + """ + # pylint: disable=invalid-name, too-many-locals + ContentType = apps.get_model("contenttypes", "ContentType") + CustomField = apps.get_model("extras", "CustomField") + Device = apps.get_model("dcim", "Device") + IPAddress = apps.get_model("ipam", "IPAddress") + Prefix = apps.get_model("ipam", "Prefix") + + snmp_loc_dict = { + "key": "snmp_location", + "type": CustomFieldTypeChoices.TYPE_TEXT, + "label": "SNMP Location", + } + snmp_loc_field, _ = CustomField.objects.get_or_create(key=snmp_loc_dict["key"], defaults=snmp_loc_dict) + snmp_loc_field.content_types.add(ContentType.objects.get_for_model(Device)) + sor_cf_dict = { + "type": CustomFieldTypeChoices.TYPE_TEXT, + "key": "system_of_record", + "label": "System of Record", + } + sor_custom_field, _ = CustomField.objects.update_or_create(key=sor_cf_dict["key"], defaults=sor_cf_dict) + sync_cf_dict = { + "type": CustomFieldTypeChoices.TYPE_DATE, + "key": "last_synced_from_sor", + "label": "Last sync from System of Record", + } + sync_custom_field, _ = CustomField.objects.update_or_create(key=sync_cf_dict["key"], defaults=sync_cf_dict) + for model in [Device, IPAddress, Prefix]: + sor_custom_field.content_types.add(ContentType.objects.get_for_model(model)) + sync_custom_field.content_types.add(ContentType.objects.get_for_model(model)) diff --git a/nautobot_ssot/integrations/solarwinds/utils/__init__.py b/nautobot_ssot/integrations/solarwinds/utils/__init__.py new file mode 100644 index 00000000..8ec3c051 --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/utils/__init__.py @@ -0,0 +1 @@ +"""Utility functions for working with Solarwinds and Nautobot.""" diff --git a/nautobot_ssot/integrations/solarwinds/utils/nautobot.py b/nautobot_ssot/integrations/solarwinds/utils/nautobot.py new file mode 100644 index 00000000..09a18a45 --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/utils/nautobot.py @@ -0,0 +1 @@ +"""Utility functions for working with Nautobot.""" diff --git a/nautobot_ssot/integrations/solarwinds/utils/solarwinds.py b/nautobot_ssot/integrations/solarwinds/utils/solarwinds.py new file mode 100644 index 00000000..0ff05a73 --- /dev/null +++ b/nautobot_ssot/integrations/solarwinds/utils/solarwinds.py @@ -0,0 +1,564 @@ +"""Utility functions for working with Solarwinds.""" + +import json +import re +from collections import defaultdict +from datetime import datetime +from typing import Dict, List, Optional + +import requests +import urllib3 +from netutils.bandwidth import bits_to_name +from netutils.interface import split_interface +from netutils.ip import is_netmask, netmask_to_cidr +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from nautobot_ssot.integrations.solarwinds.constants import ETH_INTERFACE_NAME_MAP, ETH_INTERFACE_SPEED_MAP + + +class SolarwindsClient: # pylint: disable=too-many-public-methods, too-many-instance-attributes + """Class for handling communication to Solarwinds.""" + + def __init__( # pylint: disable=too-many-arguments + self, + hostname: str, + username: str, + password: str, + port: int = 17774, + verify: bool = False, + session: requests.Session = None, + **kwargs, + ): + """Initialize shared variables for Solarwinds client. + + Args: + hostname (str): Hostname of the SolarWinds server to connect to + username (str): Username to authenticate with + password (str): Password to authenticate with + port (int, optional): Port on the remote server to connect to (17778=Legacy, 17774=preferred). Defaults to 17774. + verify (bool, optional): Validate the SSL Certificate when using Requests. Defaults to False. + session (requests.Session, optional): Customized requests session to use. Defaults to None. + kwargs (dict): Keyword arguments to catch unspecified keyword arguments. + """ + self.url = f"{hostname}:{port}/SolarWinds/InformationService/v3/Json/" + self._session = session or requests.Session() + self._session.auth = (username, password) + self._session.headers.update({"Content-Type": "application/json"}) + self._session.verify = verify + if not verify: + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + self.job = kwargs.pop("job", None) + self.batch_size = ( + self.job.integration.extra_config.get("batch_size", 100) if self.job.integration.extra_config else 100 + ) + + # Set up retries + self.timeout = kwargs.pop("timeout", None) + self.retries = kwargs.pop("retries", None) + if self.retries is not None: + retry_strategy = Retry( + total=self.retries, + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + allowed_methods=[ + "HEAD", + "GET", + "PUT", + "DELETE", + "OPTIONS", + "TRACE", + "POST", + ], + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + self._session.mount("https://", adapter) + self._session.mount("http://", adapter) + + def query(self, query: str, **params): + """Perform a query against the SolarWinds SWIS API. + + Args: + query (str): SWQL query to execute + params (dict, optional): Parameters to pass to the query. Defaults to {}. + + Returns: + dict: JSON response from the SWIS API + """ + return self._req("POST", "Query", {"query": query, "parameters": params}).json() + + @staticmethod + def _json_serial(obj): # pylint: disable=inconsistent-return-statements + """JSON serializer for objects not serializable by default json code.""" + if isinstance(obj, datetime): + serial = obj.isoformat() + return serial + + def _req(self, method: str, frag: str, data: Optional[dict] = None) -> requests.Response: + """Perform the actual request to the SolarWinds SWIS API. + + Args: + method (str): HTTP method to use + frag (str): URL fragment to append to the base URL + data (dict, optional): Data payload to include in the request. Defaults to {}. + + Returns: + requests.Response: Response object from the request + """ + try: + resp = self._session.request( + method, + self.url + frag, + data=json.dumps(data, default=self._json_serial), + timeout=self.timeout, + ) + + # try to extract reason from response when request returns error + if 400 <= resp.status_code < 600: + try: + resp.reason = json.loads(resp.text)["Message"] + except json.decoder.JSONDecodeError: + pass + + resp.raise_for_status() + return resp + except requests.exceptions.RequestException as err: + self.job.logger.error(f"An error occurred: {err}") + # Return an empty response object to avoid breaking the calling code + return requests.Response() + + def get_filtered_container_ids(self, containers: str) -> Dict[str, int]: + """Get a list of container IDs from Solarwinds. + + Args: + containers (str): Comma-separated list of container names to get IDs for. + + Returns: + Dict[str, int]: Dictionary of container names to IDs. + """ + container_ids = {} + for container in containers.split(","): + container_id = self.find_container_id_by_name(container_name=container) + if container_id != -1: + container_ids[container] = container_id + else: + self.job.logger.error(f"Unable to find container {container}.") + return container_ids + + def get_nodes_custom_property(self, custom_property: str) -> Dict[str, List[dict]]: + """Get all node IDs for all nodes based on SolarWinds CustomProperty. + + Args: + container_ids (Dict[str, int]): Dictionary of container names to their ID. + custom_property (str): SolarWinds CustomProperty which must be True for Nautobot to pull in. + + Returns: + Dict[str, List[dict]]: Dictionary of container names to list of node IDs in that container. + """ + query = f"SELECT DISTINCT SysName AS Name, MemberPrimaryID FROM Orion.Nodes INNER JOIN Orion.ContainerMembers ON Nodes.NodeID = ContainerMembers.MemberPrimaryID WHERE Nodes.CustomProperties.{custom_property}='True'" # noqa: S608 + nodes = self.query(query) + + container_nodes = nodes["results"] + return container_nodes + + def get_container_nodes( + self, container_ids: Dict[str, int], custom_property: Optional[str] = None + ) -> Dict[str, List[dict]]: + """Get node IDs for all nodes in specified container ID. + + Args: + container_ids (Dict[str, int]): Dictionary of container names to their ID. + custom_property (str): Optional SolarWinds CustomProperty which must be True for Nautobot to pull in. + + Returns: + Dict[str, List[dict]]: Dictionary of container names to list of node IDs in that container. + """ + container_nodes = {} + for container_name, container_id in container_ids.items(): + self.job.logger.debug(f"Gathering container nodes for {container_name} CID: {container_id}.") + container_nodes[container_name] = self.recurse_collect_container_nodes( + current_container_id=container_id, custom_property=custom_property + ) + return container_nodes + + def get_top_level_containers(self, top_container: str) -> Dict[str, int]: + """Retrieve all containers from Solarwinds. + + Returns: + Dict[str, int]: Dictionary of container names to IDs. + """ + top_container_id = self.find_container_id_by_name(container_name=top_container) + query = f"SELECT ContainerID, Name, MemberPrimaryID FROM Orion.ContainerMembers WHERE ContainerID = '{top_container_id}'" # noqa: S608 + results = self.query(query)["results"] + return {x["Name"]: x["MemberPrimaryID"] for x in results} + + def recurse_collect_container_nodes(self, current_container_id: int, custom_property: Optional[str] = None) -> list: + """Recursively gather all nodes for specified container ID. + + Args: + current_container_id (int): Container ID to retrieve nodes for. + custom_property (str): Optional SolarWinds CustomProperty which must be True for Nautobot to pull in. + + Returns: + list: List of node IDs in specified container. + """ + nodes_list = [] + if custom_property: + query = f"SELECT ContainerID, SysName AS Name, MemberEntityType, MemberPrimaryID FROM Orion.Nodes INNER JOIN Orion.ContainerMembers ON Nodes.NodeID = ContainerMembers.MemberPrimaryID WHERE Nodes.CustomProperties.{custom_property}='True' AND ContainerID = '{current_container_id}'" # noqa: S608 + else: + query = f"SELECT ContainerID, Name, MemberEntityType, MemberPrimaryID FROM Orion.ContainerMembers WHERE ContainerID = '{current_container_id}'" # noqa: S608 + container_members = self.query(query) + if container_members["results"]: + for member in container_members["results"]: + if member["MemberEntityType"] == "Orion.Groups": + self.job.logger.debug(f"Exploring container: {member['Name']} CID: {member['MemberPrimaryID']}") + nodes_list.extend(self.recurse_collect_container_nodes(member["MemberPrimaryID"])) + elif member["MemberEntityType"] == "Orion.Nodes": + nodes_list.append(member) + return nodes_list + + def find_container_id_by_name(self, container_name: str) -> int: + """Find container ID by name in Solarwinds. + + Args: + container_name (str): Name of container to be found. + + Returns: + int: ID for specified container. Returns -1 if not found. + """ + query_results = self.query( + f"SELECT ContainerID FROM Orion.Container WHERE Name = '{container_name}'" # noqa: S608 + ) + if query_results["results"]: + return query_results["results"][0]["ContainerID"] + return -1 + + def build_node_details(self, nodes: List[dict]) -> Dict[int, dict]: + """Build dictionary of node information. + + Args: + nodes (List[dict]): List of node information dictionaries. + + Returns: + Dict[int, dict]: Dictionary of node information with key being node primaryID. + """ + node_details = defaultdict(dict) + for node in nodes: + node_details[node["MemberPrimaryID"]] = {"NodeHostname": node["Name"], "NodeID": node["MemberPrimaryID"]} + self.batch_fill_node_details(node_data=nodes, node_details=node_details, nodes_per_batch=self.batch_size) + self.get_node_prefix_length(node_data=nodes, node_details=node_details, nodes_per_batch=self.batch_size) + self.job.logger.info("Loading interface details for nodes.") + self.gather_interface_data(node_data=nodes, node_details=node_details, nodes_per_batch=self.batch_size) + self.gather_ipaddress_data(node_data=nodes, node_details=node_details, nodes_per_batch=self.batch_size) + return node_details + + def batch_fill_node_details(self, node_data: list, node_details: dict, nodes_per_batch: int): + """Retrieve details from Solarwinds about specified nodes. + + Args: + node_data (list): List of nodes in containers. + node_details (dict): Dictionary of node details. + nodes_per_batch (int): Number of nodes to be processed per batch. + """ + current_idx = 0 + current_batch = 1 + total_batches = ( + len(node_data) // nodes_per_batch + if len(node_data) % nodes_per_batch == 0 + else len(node_data) // nodes_per_batch + 1 + ) + + while current_idx < len(node_data): + batch_nodes = node_data[current_idx : current_idx + nodes_per_batch] # noqa E203 + current_idx += nodes_per_batch + # Get the node details + if self.job.debug: + self.job.logger.debug(f"Processing batch {current_batch} of {total_batches} - Orion.Nodes.") + details_query = """ + SELECT IOSVersion AS Version, + o.IPAddress, + Location AS SNMPLocation, + o.Vendor, + MachineType AS DeviceType, + h.Model, + h.ServiceTag, + o.NodeID + FROM Orion.Nodes o LEFT JOIN Orion.HardwareHealth.HardwareInfo h ON o.NodeID = h.NodeID + WHERE NodeID IN ( + """ + for idx, node in enumerate(batch_nodes): + details_query += f"'{node['MemberPrimaryID']}'" + if idx < len(batch_nodes) - 1: + details_query += "," + details_query += ")" + query_results = self.query(details_query) + if not query_results["results"]: + if self.job.debug: + self.job.logger.error("Error: No node details found for the batch of nodes") + continue + + for result in query_results["results"]: + if result["NodeID"] in node_details: + node_id = result["NodeID"] + node_details[node_id]["Version"] = result["Version"] + node_details[node_id]["IPAddress"] = result["IPAddress"] + node_details[node_id]["SNMPLocation"] = result["SNMPLocation"] + node_details[node_id]["Vendor"] = result["Vendor"] + node_details[node_id]["DeviceType"] = result["DeviceType"] + node_details[node_id]["Model"] = result["Model"] + node_details[node_id]["ServiceTag"] = result["ServiceTag"] + # making prefix length default of 32 and will updated to the correct value in subsequent query. + node_details[node_id]["PFLength"] = 128 if ":" in result["IPAddress"] else 32 + current_batch += 1 + + def get_node_prefix_length(self, node_data: list, node_details: dict, nodes_per_batch: int): + """Gather node prefix length from IPAM.IPInfo if available. + + Args: + node_data (list): List of nodes in containers. + node_details (dict): Dictionary of node details. + nodes_per_batch (int): Number of nodes to be processed per batch. + """ + current_idx = 0 + current_batch = 1 + total_batches = ( + len(node_data) // nodes_per_batch + if len(node_data) % nodes_per_batch == 0 + else len(node_data) // nodes_per_batch + 1 + ) + + while current_idx < len(node_data): + batch_nodes = node_data[current_idx : current_idx + nodes_per_batch] # noqa E203 + current_idx += nodes_per_batch + # Get the node details + if self.job.debug: + self.job.logger.debug(f"Processing batch {current_batch} of {total_batches} - IPAM.IPInfo.") + + query = "SELECT i.CIDR AS PFLength, o.NodeID FROM Orion.Nodes o JOIN IPAM.IPInfo i ON o.IPAddressGUID = i.IPAddressN WHERE o.NodeID IN (" + for idx, node in enumerate(batch_nodes): + query += f"'{node['MemberPrimaryID']}'" + if idx < len(batch_nodes) - 1: + query += "," + query += ")" + query_results = self.query(query) + if not query_results["results"]: + if self.job.debug: + self.job.logger.error("Error: No node details found for the batch of nodes") + continue + + for result in query_results["results"]: + if result["NodeID"] in node_details: + node_details[result["NodeID"]]["PFLength"] = result["PFLength"] + current_batch += 1 + + def gather_interface_data(self, node_data: list, node_details: dict, nodes_per_batch: int): + """Retrieve interface details from Solarwinds about specified nodes. + + Args: + node_data (list): List of nodes in containers. + node_details (dict): Dictionary of node details. + nodes_per_batch (int): Number of nodes to be processed per batch. + """ + current_idx = 0 + current_batch = 1 + while current_idx < len(node_data): + batch_nodes = node_data[current_idx : current_idx + nodes_per_batch] # noqa E203 + current_idx += nodes_per_batch + query = """ + SELECT n.NodeID, + sa.StatusName AS Enabled, + so.StatusName AS Status, + i.Name, + i.MAC, + i.Speed, + i.TypeName, + i.MTU + FROM Orion.Nodes n JOIN Orion.NPM.Interfaces i ON n.NodeID = i.NodeID INNER JOIN Orion.StatusInfo sa ON i.AdminStatus = sa.StatusId INNER JOIN Orion.StatusInfo so ON i.OperStatus = so.StatusId + WHERE n.NodeID IN ( + """ + for idx, node in enumerate(batch_nodes): + query += f"'{node['MemberPrimaryID']}'" + if idx < len(batch_nodes) - 1: + query += "," + query += ")" + query_results = self.query(query) + if not query_results["results"]: + self.job.logger.error("Error: No node details found for the batch of nodes") + continue + + for result in query_results["results"]: + if result["NodeID"] in node_details: + node_id = result["NodeID"] + intf_id = result["Name"] + if not node_details[node_id].get("interfaces"): + node_details[node_id]["interfaces"] = {} + if intf_id not in node_details[node_id]["interfaces"]: + node_details[node_id]["interfaces"][intf_id] = {} + node_details[node_id]["interfaces"][intf_id]["Name"] = result["Name"] + node_details[node_id]["interfaces"][intf_id]["Enabled"] = result["Enabled"] + node_details[node_id]["interfaces"][intf_id]["Status"] = result["Status"] + node_details[node_id]["interfaces"][intf_id]["TypeName"] = result["TypeName"] + node_details[node_id]["interfaces"][intf_id]["Speed"] = result["Speed"] + node_details[node_id]["interfaces"][intf_id]["MAC"] = result["MAC"] + node_details[node_id]["interfaces"][intf_id]["MTU"] = result["MTU"] + current_batch += 1 + + @staticmethod + def standardize_device_type(node: dict) -> str: + """Method of choosing DeviceType from various potential locations and standardizing the result. + + Args: + node (dict): Node details with DeviceType and Model along with Vendor. + + Returns: + str: Standardized and sanitized string of DeviceType. + """ + device_type = "" + if node.get("Vendor"): + if node.get("Model"): + device_type = node["Model"].strip() + if not device_type.strip() and node.get("DeviceType"): + device_type = node["DeviceType"].strip() + if not device_type.strip(): + return "" + + if "Aruba" in node["Vendor"]: + device_type = device_type.replace("Aruba ", "").strip() + elif "Cisco" in node["Vendor"]: + device_type = device_type.replace("Cisco", "").strip() + device_type = device_type.replace("Catalyst ", "C").strip() + if ( + device_type + and "WS-" not in device_type + and "WLC" not in device_type + and "ASR" not in device_type + and not device_type.startswith("N") + ): + device_type = f"WS-{device_type}" + elif "Palo" in node["Vendor"]: + pass # Nothing needed yet. + return device_type + + def determine_interface_type(self, interface: dict) -> str: + """Determine interface type from a combination of Interface name, speed, and TypeName. + + Args: + interface (dict): Dictionary of Interface data to use to determine type. + + Returns: + str: Interface type based upon Interface name, speed, and TypeName. + """ + intf_default = "virtual" + if interface.get("TypeName") == "ethernetCsmacd": + intf_name = split_interface(interface=interface["Name"])[0] + if intf_name in ETH_INTERFACE_NAME_MAP: + return ETH_INTERFACE_NAME_MAP[intf_name] + intf_speed = bits_to_name(int(interface["Speed"])) + if intf_speed in ETH_INTERFACE_SPEED_MAP: + return ETH_INTERFACE_SPEED_MAP[intf_speed] + if intf_name == "Ethernet": + return ETH_INTERFACE_NAME_MAP["GigabitEthernet"] + if self.job.debug: + self.job.logger.debug(f"Unable to find Ethernet interface in map: {intf_name}") + return intf_default + + @staticmethod + def extract_version(version: str) -> str: + """Extract Device software version from string. + + Args: + version (str): Version string from Solarwinds. + + Returns: + str: Extracted version string. + """ + # Match on versions that have paranthesizes in string + sanitized_version = re.sub(pattern=r",?\s[Copyright,RELEASE].*", repl="", string=version) + return sanitized_version + + def gather_ipaddress_data(self, node_data: list, node_details: dict, nodes_per_batch: int): + """Retrieve IPAddress details from Solarwinds about specified nodes. + + Args: + node_data (list): List of nodes in containers. + node_details (dict): Dictionary of node details. + nodes_per_batch (int): Number of nodes to be processed per batch. + """ + current_idx = 0 + current_batch = 1 + while current_idx < len(node_data): + batch_nodes = node_data[current_idx : current_idx + nodes_per_batch] # noqa E203 + current_idx += nodes_per_batch + query = """ + SELECT NIPA.NodeID, + NIPA.InterfaceIndex, + NIPA.IPAddress, + NIPA.IPAddressType, + NPMI.Name, + NIPA.SubnetMask + FROM Orion.NodeIPAddresses NIPA INNER JOIN Orion.NPM.Interfaces NPMI ON NIPA.NodeID=NPMI.NodeID AND NIPA.InterfaceIndex=NPMI.InterfaceIndex INNER JOIN Orion.Nodes N ON NIPA.NodeID=N.NodeID + WHERE NIPA.NodeID IN ( + """ + for idx, node in enumerate(batch_nodes): + query += f"'{node['MemberPrimaryID']}'" + if idx < len(batch_nodes) - 1: + query += "," + query += ")" + query_results = self.query(query) + if not query_results["results"]: + self.job.logger.error("Error: No node details found for the batch of nodes") + continue + + for result in query_results["results"]: + if result["NodeID"] in node_details: + node_id = result["NodeID"] + ip_id = result["IPAddress"] + if is_netmask(result["SubnetMask"]): + netmask_cidr = netmask_to_cidr(netmask=result["SubnetMask"]) + else: + if ":" in result["IPAddress"]: + netmask_cidr = 128 + else: + netmask_cidr = 32 + if not node_details[node_id].get("ipaddrs"): + node_details[node_id]["ipaddrs"] = {} + if ip_id not in node_details[node_id]["ipaddrs"]: + node_details[node_id]["ipaddrs"][ip_id] = {} + node_details[node_id]["ipaddrs"][ip_id]["IPAddress"] = result["IPAddress"] + node_details[node_id]["ipaddrs"][ip_id]["SubnetMask"] = netmask_cidr + node_details[node_id]["ipaddrs"][ip_id]["IPAddressType"] = result["IPAddressType"] + node_details[node_id]["ipaddrs"][ip_id]["IntfName"] = result["Name"] + current_batch += 1 + + +def determine_role_from_devicetype(device_type: str, role_map: dict) -> str: + """Determine Device Role from passed DeviceType. + + Args: + device_type (str): DeviceType model to determine Device Role. + role_map (dict): Dictionary mapping DeviceType model to Device Role name. + + Returns: + str: Device Role name if match found else blank string. + """ + role = "" + if device_type in role_map: + return role_map[device_type] + return role + + +def determine_role_from_hostname(hostname: str, role_map: dict) -> str: + """Determine Device Role from passed Hostname. + + Args: + hostname (str): Device hostname to determine Device Role. + role_map (dict): Dictionary mapping regex patterns for Device hostnames to Device Role name. + + Returns: + str: Device Role name if match found else blank string. + """ + role = "" + for pattern, role_name in role_map.items(): + if re.match(pattern, hostname): + return role_name + return role diff --git a/nautobot_ssot/jobs/__init__.py b/nautobot_ssot/jobs/__init__.py index 8da1b6b4..13346adb 100644 --- a/nautobot_ssot/jobs/__init__.py +++ b/nautobot_ssot/jobs/__init__.py @@ -20,6 +20,7 @@ "nautobot_ssot_aci": "2.2", "nautobot_ssot_dna_center": "2.2", "nautobot_ssot_meraki": "2.2", + "nautobot_ssot_solarwinds": "2.2", } diff --git a/nautobot_ssot/tests/solarwinds/__init__.py b/nautobot_ssot/tests/solarwinds/__init__.py new file mode 100644 index 00000000..feef738d --- /dev/null +++ b/nautobot_ssot/tests/solarwinds/__init__.py @@ -0,0 +1 @@ +"""Unit tests for Solarwinds SSoT app.""" diff --git a/nautobot_ssot/tests/solarwinds/conftest.py b/nautobot_ssot/tests/solarwinds/conftest.py new file mode 100644 index 00000000..b68b61e4 --- /dev/null +++ b/nautobot_ssot/tests/solarwinds/conftest.py @@ -0,0 +1,33 @@ +"""Params for testing.""" + +import json + +from nautobot_ssot.integrations.solarwinds.utils.solarwinds import SolarwindsClient + + +def load_json(path): + """Load a json file.""" + with open(path, encoding="utf-8") as file: + return json.loads(file.read()) + + +GET_CONTAINER_NODES_FIXTURE = load_json("./nautobot_ssot/tests/solarwinds/fixtures/get_container_nodes.json") +GET_TOP_LEVEL_CONTAINERS_FIXTURE = load_json("./nautobot_ssot/tests/solarwinds/fixtures/get_top_level_containers.json") +NODE_DETAILS_FIXTURE = load_json("./nautobot_ssot/tests/solarwinds/fixtures/node_details.json") +GET_NODES_CUSTOM_PROPERTY_FIXTURE = load_json( + "./nautobot_ssot/tests/solarwinds/fixtures/get_nodes_custom_property.json" +) + + +def create_solarwinds_client(**kwargs) -> SolarwindsClient: + """Function to initialize a SolarwindsClient object.""" + return SolarwindsClient( # nosec: B106 + hostname=kwargs.pop("hostname", "https://test.solarwinds.com"), + username=kwargs.pop("username", "admin"), + password=kwargs.pop("password", "admin"), + port=kwargs.pop("port", 443), + retries=kwargs.pop("retries", 5), + timeout=kwargs.pop("timeout", 60), + verify=kwargs.pop("verify", True), + job=kwargs.pop("job", None), + ) diff --git a/nautobot_ssot/tests/solarwinds/fixtures/get_container_nodes.json b/nautobot_ssot/tests/solarwinds/fixtures/get_container_nodes.json new file mode 100644 index 00000000..7922c7f2 --- /dev/null +++ b/nautobot_ssot/tests/solarwinds/fixtures/get_container_nodes.json @@ -0,0 +1,16 @@ +{ + "HQ": [ + { + "ContainerID": 1, + "Name": "UNKNOWN_DEVICE_TYPE1", + "MemberEntityType": "Orion.Nodes", + "MemberPrimaryID": 10 + }, + { + "ContainerID": 1, + "Name": "Router01", + "MemberEntityType": "Orion.Nodes", + "MemberPrimaryID": 11 + } + ] +} \ No newline at end of file diff --git a/nautobot_ssot/tests/solarwinds/fixtures/get_nodes_custom_property.json b/nautobot_ssot/tests/solarwinds/fixtures/get_nodes_custom_property.json new file mode 100644 index 00000000..dc2210a9 --- /dev/null +++ b/nautobot_ssot/tests/solarwinds/fixtures/get_nodes_custom_property.json @@ -0,0 +1,10 @@ +[ + { + "Name": "UNKNOWN_DEVICE_TYPE1", + "MemberPrimaryID": 10 + }, + { + "Name": "Router01", + "MemberPrimaryID": 11 + } +] \ No newline at end of file diff --git a/nautobot_ssot/tests/solarwinds/fixtures/get_top_level_containers.json b/nautobot_ssot/tests/solarwinds/fixtures/get_top_level_containers.json new file mode 100644 index 00000000..4fea7f9a --- /dev/null +++ b/nautobot_ssot/tests/solarwinds/fixtures/get_top_level_containers.json @@ -0,0 +1,8 @@ +{ + "1": { + "Name": "HQ" + }, + "2": { + "Name": "DC01" + } +} \ No newline at end of file diff --git a/nautobot_ssot/tests/solarwinds/fixtures/node_details.json b/nautobot_ssot/tests/solarwinds/fixtures/node_details.json new file mode 100644 index 00000000..c904835b --- /dev/null +++ b/nautobot_ssot/tests/solarwinds/fixtures/node_details.json @@ -0,0 +1,106 @@ +{ + "10": { + "NodeHostname": "UNKNOWN_DEVICE_TYPE1", + "NodeID": 10, + "interfaces": { + "TenGigabitEthernet0/0/0": { + "Name": "TenGigabitEthernet0/0/0", + "Enabled": "Up", + "Status": "Up", + "TypeName": "ethernetCsmacd", + "Speed": 10000000000.0, + "MAC": "AA74D2BCD341", + "MTU": 9104 + }, + "TenGigabitEthernet0/1/0": { + "Name": "TenGigabitEthernet0/1/0", + "Enabled": "Unknown", + "Status": "Unknown", + "TypeName": "ethernetCsmacd", + "Speed": 10000000000.0, + "MAC": "B8D028D78C15", + "MTU": 9216 + }, + "TenGigabitEthernet0/1/0.75": { + "Name": "TenGigabitEthernet0/1/0.75", + "Enabled": "Unknown", + "Status": "Unknown", + "TypeName": "l2vlan", + "Speed": 10000000000.0, + "MAC": "G6F260AD2C18", + "MTU": 9216 + } + }, + "ipaddrs": { + "1.1.1.1": { + "IPAddress": "1.1.1.1", + "SubnetMask": 23, + "IPAddressType": "IPv4", + "IntfName": "TenGigabitEthernet0/0/0" + }, + "10.10.1.2": { + "IPAddress": "10.10.1.2", + "SubnetMask": 23, + "IPAddressType": "IPv4", + "IntfName": "TenGigabitEthernet0/1/0.75" + } + } + }, + "11": { + "NodeHostname": "Router01", + "NodeID": 11, + "Version": "03.11.01.E RELEASE SOFTWARE (fc4)", + "IPAddress": "172.16.5.2", + "PFLength": 24, + "SNMPLocation": "LOCATION STRING", + "Vendor": "Cisco", + "DeviceType": "Cisco Catalyst 4500 L3", + "Model": null, + "ServiceTag": null, + "interfaces": { + "TenGigabitEthernet1/1/1": { + "Name": "TenGigabitEthernet1/1/1", + "Enabled": "Unknown", + "Status": "Unknown", + "TypeName": "ethernetCsmacd", + "Speed": 1000000000.0, + "MAC": "F674BD01ADE4", + "MTU": 1500 + }, + "TenGigabitEthernet1/1/2": { + "Name": "TenGigabitEthernet1/1/2", + "Enabled": "Unknown", + "Status": "Unknown", + "TypeName": "ethernetCsmacd", + "Speed": 1000000000.0, + "MAC": "F674BD01ADE5", + "MTU": 1500 + } + }, + "ipaddrs": { + "10.11.1.1": { + "IPAddress": "10.11.1.1", + "SubnetMask": 23, + "IPAddressType": "IPv4", + "IntfName": "TenGigabitEthernet1/1/1" + }, + "10.11.1.2": { + "IPAddress": "10.11.1.2", + "SubnetMask": 23, + "IPAddressType": "IPv4", + "IntfName": "TenGigabitEthernet1/1/2" + }, + "172.16.1.1": { + "IPAddress": "172.16.1.1", + "SubnetMask": 24, + "IPAddressType": "IPv4", + "IntfName": "Ethernet0/1" + } + } + }, + "12": { + "NodeHostname": "net-snmp Device", + "NodeID": 12, + "Vendor": "net-snmp" + } +} \ No newline at end of file diff --git a/nautobot_ssot/tests/solarwinds/test_jobs.py b/nautobot_ssot/tests/solarwinds/test_jobs.py new file mode 100644 index 00000000..c730a052 --- /dev/null +++ b/nautobot_ssot/tests/solarwinds/test_jobs.py @@ -0,0 +1,123 @@ +"""Tests to validate Job functions.""" + +import uuid +from unittest.mock import MagicMock + +from nautobot.core.testing import TransactionTestCase +from nautobot.dcim.models import LocationType +from nautobot.extras.models import JobResult + +from nautobot_ssot.integrations.solarwinds.jobs import JobConfigError, SolarwindsDataSource + + +class SolarwindsDataSourceTestCase(TransactionTestCase): + """Test the SolarwindsDataSource class.""" + + job_class = SolarwindsDataSource + databases = ("default", "job_logs") + + def setUp(self): + """Per-test setup.""" + super().setUp() + self.job = self.job_class() + self.job.logger.error = MagicMock() + + self.job.job_result = JobResult.objects.create( + name=self.job.class_path, task_name="Fake task", user=None, id=uuid.uuid4() + ) + + def test_validate_containers_blank(self): + """Validate handling of no containers being defined in Job form.""" + self.job.containers = "" + with self.assertRaises(JobConfigError): + self.job.validate_containers() + self.job.logger.error.assert_called_once_with( + "Containers variable must be defined with container name(s) or 'ALL'." + ) + + def test_validate_containers_missing_top(self): + """Validate handling of top container not defined when 'ALL' containers specified.""" + self.job.containers = "ALL" + self.job.top_container = "" + self.job.pull_from = "Containers" + with self.assertRaises(JobConfigError): + self.job.validate_containers() + self.job.logger.error.assert_called_once_with( + "Top Container must be specified if `ALL` Containers are to be imported." + ) + + def test_validate_location_configuration_missing_parent(self): + """Validate handling of validate_location_configuration() when parent Location isn't specified but required.""" + reg_lt = LocationType.objects.create(name="Region") + site_lt = LocationType.objects.create(name="Site", parent=reg_lt) + self.job.location_type = site_lt + self.job.parent = None + with self.assertRaises(JobConfigError): + self.job.validate_location_configuration() + self.job.logger.error.assert_called_once_with("LocationType %s requires Parent Location be specified.", site_lt) + + def test_validate_location_configuration_extra_parent(self): + """Validate handling of validate_location_configuration() when parent Location is specified, but not required.""" + reg_lt = LocationType.objects.create(name="Region") + site_lt = LocationType.objects.create(name="Site") + self.job.location_type = site_lt + self.job.parent = reg_lt + with self.assertRaises(JobConfigError): + self.job.validate_location_configuration() + self.job.logger.error.assert_called_once_with( + "LocationType %s does not require a Parent location, but a Parent location was chosen.", site_lt + ) + + def test_validate_location_configuration_missing_location_type(self): + self.job.pull_from = "Containers" + self.job.location_type = None + self.job.location_override = None + with self.assertRaises(JobConfigError): + self.job.validate_location_configuration() + self.job.logger.error.assert_called_once_with( + "A Location Type must be specified, unless using Location Override." + ) + + def test_validate_location_configuration_missing_device_contenttype(self): + """Validate handling of validate_location_configuration() when Device ContentType on the specified LocationType.""" + site_lt = LocationType.objects.create(name="Site") + self.job.location_type = site_lt + self.job.parent = None + with self.assertRaises(JobConfigError): + self.job.validate_location_configuration() + self.job.logger.error.assert_called_once_with( + "Specified LocationType %s is missing Device ContentType. Please change LocationType or add Device ContentType to %s LocationType and re-run Job.", + site_lt, + site_lt, + ) + + def test_validate_role_map(self): + """Validate handling of validate_role_map() when Role choice isn't specified.""" + self.job.role_map = {"ASR1001": "Router"} + self.job.role_choice = None + with self.assertRaises(JobConfigError): + self.job.validate_role_map() + self.job.logger.error.assert_called_once_with( + "Role Map Matching Attribute must be defined if Role Map is specified." + ) + + def test_validate_custom_property(self): + """Validate handling of validate_custom_property() if Custom Property is missing.""" + self.job.pull_from = "CustomProperty" + self.job.custom_property = None + with self.assertRaises(JobConfigError): + self.job.validate_custom_property() + self.job.logger.error.assert_called_once_with( + "Custom Property value must exist if pulling from Custom Property." + ) + + def test_validate_custom_property_location(self): + """Validate handling of validate_custom_property() when Location Override isn't specified.""" + self.job.pull_from = "CustomProperty" + self.job.custom_property = "Nautobot_Monitoring" + self.job.location_override = None + with self.assertRaises(JobConfigError): + self.job.validate_custom_property() + self.job.logger.error.assert_called_once_with( + "Location Override must be selected if pulling from CustomProperty." + ) diff --git a/nautobot_ssot/tests/solarwinds/test_solarwinds_adapter.py b/nautobot_ssot/tests/solarwinds/test_solarwinds_adapter.py new file mode 100644 index 00000000..40868eed --- /dev/null +++ b/nautobot_ssot/tests/solarwinds/test_solarwinds_adapter.py @@ -0,0 +1,405 @@ +"""Test Solarwinds adapter.""" + +import uuid +from unittest.mock import MagicMock, call, patch + +from diffsync.enum import DiffSyncModelFlags +from django.contrib.contenttypes.models import ContentType +from nautobot.core.testing import TransactionTestCase +from nautobot.dcim.models import Device, Location, LocationType +from nautobot.extras.models import JobResult, Role, Status + +import nautobot_ssot.tests.solarwinds.conftest as fix # move to fixtures folder? +from nautobot_ssot.integrations.solarwinds.diffsync.adapters.solarwinds import SolarwindsAdapter +from nautobot_ssot.integrations.solarwinds.jobs import SolarwindsDataSource + + +class TestSolarwindsAdapterTestCase(TransactionTestCase): # pylint: disable=too-many-public-methods + """Test NautobotSsotSolarwindsAdapter class.""" + + databases = ("default", "job_logs") + + def setUp(self): # pylint: disable=invalid-name + """Initialize test case.""" + self.status_active = Status.objects.get_or_create(name="Active")[0] + self.status_active.content_types.add(ContentType.objects.get_for_model(Device)) + + self.solarwinds_client = MagicMock() + self.solarwinds_client.get_top_level_containers.return_value = fix.GET_TOP_LEVEL_CONTAINERS_FIXTURE + self.solarwinds_client.get_filtered_container_ids.return_value = {"HQ": 1} + self.solarwinds_client.get_nodes_custom_property.return_value = fix.GET_NODES_CUSTOM_PROPERTY_FIXTURE + self.solarwinds_client.get_container_nodes.return_value = fix.GET_CONTAINER_NODES_FIXTURE + + self.containers = "HQ" + + self.location_type = LocationType.objects.get_or_create(name="Site")[0] + self.location_type.content_types.add(ContentType.objects.get_for_model(Device)) + + self.parent = Location.objects.get_or_create( + name="USA", location_type=LocationType.objects.get_or_create(name="Region")[0], status=self.status_active + )[0] + + self.job = SolarwindsDataSource() + self.job.debug = True + self.job.job_result = JobResult.objects.create( + name=self.job.class_path, task_name="Fake task", user=None, id=uuid.uuid4() + ) + self.job.logger = MagicMock() + self.job.logger.debug = MagicMock() + self.job.logger.error = MagicMock() + self.job.logger.info = MagicMock() + self.job.logger.warning = MagicMock() + self.job.location_type = self.location_type + self.job.parent = self.parent + self.job.default_role = Role.objects.get_or_create(name="Router")[0] + self.solarwinds = SolarwindsAdapter( + job=self.job, + sync=None, + client=self.solarwinds_client, + containers=self.containers, + location_type=self.location_type, + ) + + def test_data_loading_wo_parent(self): + """Test Nautobot SSoT Solarwinds load() function without parent specified.""" + self.solarwinds_client.standardize_device_type.side_effect = ["", "WS-C4500 L3", ""] + self.solarwinds_client.extract_version.return_value = "03.11.01.E" + self.solarwinds_client.build_node_details.return_value = fix.NODE_DETAILS_FIXTURE + self.solarwinds_client.determine_interface_type.return_value = "10gbase-t" + + self.solarwinds.load_parent = MagicMock() + self.solarwinds.load_prefix = MagicMock() + self.solarwinds.load_ipaddress = MagicMock() + self.solarwinds.load_interfaces = MagicMock() + self.solarwinds.load_ipassignment = MagicMock() + + self.solarwinds.load() + self.solarwinds.load_parent.assert_not_called() + self.job.logger.debug.assert_has_calls( + [ + call("Retrieving node details from Solarwinds for HQ."), + call( + 'Node details: {\n "10": {\n "NodeHostname": "UNKNOWN_DEVICE_TYPE1",\n "NodeID": 10,\n "interfaces": {\n "TenGigabitEthernet0/0/0": {\n "Name": "TenGigabitEthernet0/0/0",\n "Enabled": "Up",\n "Status": "Up",\n "TypeName": "ethernetCsmacd",\n "Speed": 10000000000.0,\n "MAC": "AA74D2BCD341",\n "MTU": 9104\n },\n "TenGigabitEthernet0/1/0": {\n "Name": "TenGigabitEthernet0/1/0",\n "Enabled": "Unknown",\n "Status": "Unknown",\n "TypeName": "ethernetCsmacd",\n "Speed": 10000000000.0,\n "MAC": "B8D028D78C15",\n "MTU": 9216\n },\n "TenGigabitEthernet0/1/0.75": {\n "Name": "TenGigabitEthernet0/1/0.75",\n "Enabled": "Unknown",\n "Status": "Unknown",\n "TypeName": "l2vlan",\n "Speed": 10000000000.0,\n "MAC": "G6F260AD2C18",\n "MTU": 9216\n }\n },\n "ipaddrs": {\n "1.1.1.1": {\n "IPAddress": "1.1.1.1",\n "SubnetMask": 23,\n "IPAddressType": "IPv4",\n "IntfName": "TenGigabitEthernet0/0/0"\n },\n "10.10.1.2": {\n "IPAddress": "10.10.1.2",\n "SubnetMask": 23,\n "IPAddressType": "IPv4",\n "IntfName": "TenGigabitEthernet0/1/0.75"\n }\n }\n },\n "11": {\n "NodeHostname": "Router01",\n "NodeID": 11,\n "Version": "03.11.01.E RELEASE SOFTWARE (fc4)",\n "IPAddress": "172.16.5.2",\n "PFLength": 24,\n "SNMPLocation": "LOCATION STRING",\n "Vendor": "Cisco",\n "DeviceType": "Cisco Catalyst 4500 L3",\n "Model": null,\n "ServiceTag": null,\n "interfaces": {\n "TenGigabitEthernet1/1/1": {\n "Name": "TenGigabitEthernet1/1/1",\n "Enabled": "Unknown",\n "Status": "Unknown",\n "TypeName": "ethernetCsmacd",\n "Speed": 1000000000.0,\n "MAC": "F674BD01ADE4",\n "MTU": 1500\n },\n "TenGigabitEthernet1/1/2": {\n "Name": "TenGigabitEthernet1/1/2",\n "Enabled": "Unknown",\n "Status": "Unknown",\n "TypeName": "ethernetCsmacd",\n "Speed": 1000000000.0,\n "MAC": "F674BD01ADE5",\n "MTU": 1500\n }\n },\n "ipaddrs": {\n "10.11.1.1": {\n "IPAddress": "10.11.1.1",\n "SubnetMask": 23,\n "IPAddressType": "IPv4",\n "IntfName": "TenGigabitEthernet1/1/1"\n },\n "10.11.1.2": {\n "IPAddress": "10.11.1.2",\n "SubnetMask": 23,\n "IPAddressType": "IPv4",\n "IntfName": "TenGigabitEthernet1/1/2"\n },\n "172.16.1.1": {\n "IPAddress": "172.16.1.1",\n "SubnetMask": 24,\n "IPAddressType": "IPv4",\n "IntfName": "Ethernet0/1"\n }\n }\n },\n "12": {\n "NodeHostname": "net-snmp Device",\n "NodeID": 12,\n "Vendor": "net-snmp"\n }\n}' + ), + ] + ) + self.assertEqual( + { + dev["NodeHostname"] + for _, dev in fix.NODE_DETAILS_FIXTURE.items() + if dev.get("Model") or dev.get("DeviceType") + }, + {dev.get_unique_id() for dev in self.solarwinds.get_all("device")}, + ) + self.solarwinds.load_prefix.assert_called() + self.solarwinds.load_prefix.assert_has_calls( + [ + call(network="172.16.5.0/24"), + call(network="10.11.0.0/23"), + call(network="10.11.0.0/23"), + call(network="172.16.1.0/24"), + ] + ) + self.solarwinds.load_ipaddress.assert_called() + self.solarwinds.load_ipaddress.assert_has_calls( + [ + call(addr="172.16.5.2", prefix_length=24, prefix="172.16.5.0/24", addr_type="IPv4"), + call(addr="10.11.1.1", prefix_length=23, prefix="10.11.0.0/23", addr_type="IPv4"), + call(addr="10.11.1.2", prefix_length=23, prefix="10.11.0.0/23", addr_type="IPv4"), + call(addr="172.16.1.1", prefix_length=24, prefix="172.16.1.0/24", addr_type="IPv4"), + ] + ) + + loaded_dev = self.solarwinds.get("device", "Router01") + self.solarwinds.load_interfaces.assert_called() + self.solarwinds.load_interfaces.assert_has_calls( + [ + call(device=loaded_dev, intfs=fix.NODE_DETAILS_FIXTURE["11"]["interfaces"]), + call(device=loaded_dev, intfs={1: {"Name": "Management", "Enabled": "Up", "Status": "Up"}}), + ] + ) + self.solarwinds.load_ipassignment.assert_has_calls( + [ + call( + addr="172.16.5.2", + dev_name="Router01", + intf_name="Management", + addr_type="IPv4", + mgmt_addr="172.16.5.2", + ), + call( + addr="10.11.1.1", + dev_name="Router01", + intf_name="TenGigabitEthernet1/1/1", + addr_type="IPv4", + mgmt_addr="172.16.5.2", + ), + call( + addr="10.11.1.2", + dev_name="Router01", + intf_name="TenGigabitEthernet1/1/2", + addr_type="IPv4", + mgmt_addr="172.16.5.2", + ), + call( + addr="172.16.1.1", + dev_name="Router01", + intf_name="Ethernet0/1", + addr_type="IPv4", + mgmt_addr="172.16.5.2", + ), + ] + ) + self.job.logger.error.assert_has_calls( + [ + call("UNKNOWN_DEVICE_TYPE1 is missing DeviceType so won't be imported."), + call("net-snmp Device is showing as net-snmp so won't be imported."), + ] + ) + self.assertEqual(len(self.solarwinds.failed_devices), 2) + self.job.logger.warning.assert_called_with( + 'List of 2 devices that were unable to be loaded. [\n {\n "NodeHostname": "UNKNOWN_DEVICE_TYPE1",\n "NodeID": 10,\n "interfaces": {\n "TenGigabitEthernet0/0/0": {\n "Name": "TenGigabitEthernet0/0/0",\n "Enabled": "Up",\n "Status": "Up",\n "TypeName": "ethernetCsmacd",\n "Speed": 10000000000.0,\n "MAC": "AA74D2BCD341",\n "MTU": 9104\n },\n "TenGigabitEthernet0/1/0": {\n "Name": "TenGigabitEthernet0/1/0",\n "Enabled": "Unknown",\n "Status": "Unknown",\n "TypeName": "ethernetCsmacd",\n "Speed": 10000000000.0,\n "MAC": "B8D028D78C15",\n "MTU": 9216\n },\n "TenGigabitEthernet0/1/0.75": {\n "Name": "TenGigabitEthernet0/1/0.75",\n "Enabled": "Unknown",\n "Status": "Unknown",\n "TypeName": "l2vlan",\n "Speed": 10000000000.0,\n "MAC": "G6F260AD2C18",\n "MTU": 9216\n }\n },\n "ipaddrs": {\n "1.1.1.1": {\n "IPAddress": "1.1.1.1",\n "SubnetMask": 23,\n "IPAddressType": "IPv4",\n "IntfName": "TenGigabitEthernet0/0/0"\n },\n "10.10.1.2": {\n "IPAddress": "10.10.1.2",\n "SubnetMask": 23,\n "IPAddressType": "IPv4",\n "IntfName": "TenGigabitEthernet0/1/0.75"\n }\n },\n "error": "Unable to determine DeviceType."\n },\n {\n "NodeHostname": "net-snmp Device",\n "NodeID": 12,\n "Vendor": "net-snmp",\n "error": "Unable to determine DeviceType."\n }\n]' + ) + + def test_data_loading_w_parent(self): + """Test Nautobot SSoT Solarwinds load() function with parent specified.""" + self.solarwinds = SolarwindsAdapter( + job=self.job, + sync=None, + client=self.solarwinds_client, + containers=self.containers, + location_type=self.location_type, + parent=self.parent, + ) + + self.solarwinds.load_parent = MagicMock() + self.solarwinds.get_container_nodes = MagicMock() + + self.solarwinds.load() + self.solarwinds.load_parent.assert_called_once() + self.solarwinds.get_container_nodes.assert_called_once() + + def test_load_manufacturer_and_device_type(self): + """Test the load_manufacturer_and_device_type() function for success.""" + self.solarwinds.load_manufacturer_and_device_type(manufacturer="Cisco", device_type="ASR1001") + self.assertEqual({"Cisco"}, {manu.get_unique_id() for manu in self.solarwinds.get_all("manufacturer")}) + self.assertEqual({"ASR1001__Cisco"}, {manu.get_unique_id() for manu in self.solarwinds.get_all("device_type")}) + + def test_get_nodes_custom_property(self): + """Test the get_nodes_custom_property() function success.""" + results = self.solarwinds_client.get_nodes_custom_property(custom_property="Nautobot_Monitoring") + self.solarwinds_client.get_nodes_custom_property.assert_called_once_with(custom_property="Nautobot_Monitoring") + self.solarwinds_client.get_nodes_custom_property.assert_called() + + self.assertEqual(results, fix.GET_NODES_CUSTOM_PROPERTY_FIXTURE) + + def test_get_container_nodes_specific_container(self): + """Test the get_container_nodes() function success with a specific container.""" + results = self.solarwinds.get_container_nodes() + self.assertEqual(self.solarwinds.containers, "HQ") + self.solarwinds_client.get_filtered_container_ids.assert_called_once_with(containers="HQ") + self.solarwinds_client.get_container_nodes.assert_called() + self.assertEqual(results, fix.GET_CONTAINER_NODES_FIXTURE) + + def test_get_container_nodes_all_containers(self): + """Test the get_container_nodes() function success with all containers.""" + self.solarwinds.containers = "ALL" + self.job.top_container = "USA" + results = self.solarwinds.get_container_nodes() + self.solarwinds_client.get_top_level_containers.assert_called_once_with(top_container="USA") + self.solarwinds_client.get_container_nodes.assert_called() + self.assertEqual(results, fix.GET_CONTAINER_NODES_FIXTURE) + + def test_load_location(self): + """Test the load_location() function.""" + self.solarwinds.load_location(loc_name="HQ", location_type="Site", status="Active") + self.assertEqual( + {"HQ__Site__None__None__None__None"}, {loc.get_unique_id() for loc in self.solarwinds.get_all("location")} + ) + + def test_load_parent(self): + """Test the load_parent() function loads the Parent Location.""" + self.solarwinds = SolarwindsAdapter( + job=self.job, + sync=None, + client=self.solarwinds_client, + containers=self.containers, + location_type=self.location_type, + parent=self.parent, + ) + self.solarwinds.load_parent() + self.assertEqual( + {"USA__Region__None__None__None__None"}, + {loc.get_unique_id() for loc in self.solarwinds.get_all("location")}, + ) + parent = self.solarwinds.get("location", "USA__Region__None__None__None__None") + self.assertEqual(parent.model_flags, DiffSyncModelFlags.SKIP_UNMATCHED_DST) + + def load_sites_wo_parent(self): + """Test the load_sites() function when a parent isn't specified.""" + test_sites = { + "HQ": [ + {"ContainerID": 1, "MemberPrimaryID": 10}, + {"ContainerID": 1, "MemberPrimaryID": 11}, + ], + "DC01": [ + {"ContainerID": 2, "MemberPrimaryID": 20}, + {"ContainerID": 2, "MemberPrimaryID": 21}, + ], + } + self.solarwinds.load_sites(container_nodes=test_sites) + self.job.logger.debug.calls[0].assert_called_with("Found 2 nodes for HQ container.") + self.job.logger.debug.calls[1].assert_called_with("Found 2 nodes for DC01 container.") + self.assertEqual( + {"HQ__Site__None__None__None__None", "DC01__Site__None__None__None__None"}, + {loc.get_unique_id() for loc in self.solarwinds.get_all("location")}, + ) + + def load_sites_w_parent(self): + """Test the load_sites() function when a parent isn't specified.""" + self.solarwinds = SolarwindsAdapter( + job=self.job, + sync=None, + client=self.solarwinds_client, + containers=self.containers, + location_type=self.location_type, + parent=self.parent, + ) + test_sites = { + "HQ": [ + {"ContainerID": 1, "MemberPrimaryID": 10}, + {"ContainerID": 1, "MemberPrimaryID": 11}, + ], + "DC01": [ + {"ContainerID": 2, "MemberPrimaryID": 20}, + {"ContainerID": 2, "MemberPrimaryID": 21}, + ], + } + self.solarwinds.load_sites(container_nodes=test_sites) + self.job.logger.debug.calls[0].assert_called_with("Found 2 nodes for HQ container.") + self.job.logger.debug.calls[1].assert_called_with("Found 2 nodes for DC01 container.") + self.assertEqual( + {"HQ__Site__USA__Region", "DC01__Site__USA__Region"}, + {loc.get_unique_id() for loc in self.solarwinds.get_all("location")}, + ) + + @patch("nautobot_ssot.integrations.solarwinds.diffsync.adapters.solarwinds.determine_role_from_devicetype") + def test_determine_device_role_device_type(self, mock_func): + """Test the determine_device_role() when DeviceType role choice is specified.""" + self.job.role_map = {"ASR1001": "Router"} + self.job.role_choice = "DeviceType" + + self.solarwinds.determine_device_role(node={}, device_type="ASR1001") + mock_func.assert_called_with(device_type="ASR1001", role_map={"ASR1001": "Router"}) + + @patch("nautobot_ssot.integrations.solarwinds.diffsync.adapters.solarwinds.determine_role_from_hostname") + def test_determine_device_role_hostname(self, mock_func): + """Test the determine_device_role() when Hostname role choice is specified.""" + self.job.role_map = {".*router.*": "Router"} + self.job.role_choice = "Hostname" + + self.solarwinds.determine_device_role(node={"NodeHostname": "core-router.corp"}, device_type="") + mock_func.assert_called_with(hostname="core-router.corp", role_map={".*router.*": "Router"}) + + def test_load_role(self): + """Test the load_role() success.""" + self.solarwinds.load_role(role="Test") + self.assertEqual({"Test"}, {role.get_unique_id() for role in self.solarwinds.get_all("role")}) + + def test_load_platform_ios(self): + """Test the load_platform() function with IOS device.""" + result = self.solarwinds.load_platform(device_type="ASR1001", manufacturer="Cisco") + self.assertEqual(result, "cisco.ios.ios") + self.assertEqual( + {"cisco.ios.ios__Cisco"}, {plat.get_unique_id() for plat in self.solarwinds.get_all("platform")} + ) + + def test_load_platform_nxos(self): + """Test the load_platform() function with Nexus device.""" + result = self.solarwinds.load_platform(device_type="N9K-93180YC", manufacturer="Cisco") + self.assertEqual(result, "cisco.nxos.nxos") + self.assertEqual( + {"cisco.nxos.nxos__Cisco"}, {plat.get_unique_id() for plat in self.solarwinds.get_all("platform")} + ) + + def test_load_interfaces(self): + """Test the load_interfaces() functions successfully.""" + mock_dev = MagicMock() + mock_dev.name = "Test Device" + + self.solarwinds_client.determine_interface_type.return_value = "1000base-t" + + test_intfs = { + "GigabitEthernet0/1": { + "Name": "GigabitEthernet0/1", + "Enabled": "Up", + "Status": "Up", + "MTU": 9180, + "MAC": "112233445566", + }, + "GigabitEthernet0/2": { + "Name": "GigabitEthernet0/2", + "Enabled": "Up", + "Status": "Up", + "MTU": 9180, + "MAC": "112233445567", + }, + } + self.solarwinds.load_interfaces(device=mock_dev, intfs=test_intfs) + self.assertEqual( + {"GigabitEthernet0/1__Test Device", "GigabitEthernet0/2__Test Device"}, + {intf.get_unique_id() for intf in self.solarwinds.get_all("interface")}, + ) + self.solarwinds_client.determine_interface_type.assert_called() + mock_dev.add_child.assert_called() + + def test_load_prefix(self): + """Validate that the load_prefix() function loads Prefix DiffSync object.""" + self.solarwinds.load_prefix(network="10.0.0.0/24") + self.assertEqual({"10.0.0.0__24__Global"}, {pf.get_unique_id() for pf in self.solarwinds.get_all("prefix")}) + + def test_load_ipaddress(self): + """Validate that load_ipaddress() correctly loads a DiffSync object.""" + self.solarwinds.load_ipaddress(addr="10.0.0.1", prefix_length=24, prefix="10.0.0.0/24", addr_type="IPv4") + self.assertEqual( + {"10.0.0.1__10.0.0.0__24__Global"}, + {ipaddr.get_unique_id() for ipaddr in self.solarwinds.get_all("ipaddress")}, + ) + + def test_load_ipassignment(self): + """Validate that load_ipassignment() correctly loads a DiffSync object.""" + self.solarwinds.load_ipassignment( + addr="10.0.0.1", dev_name="Test Device", intf_name="Management", addr_type="IPv4", mgmt_addr="10.0.0.1" + ) + self.assertEqual( + {"Test Device__Management__10.0.0.1"}, + {assignment.get_unique_id() for assignment in self.solarwinds.get_all("ipassignment")}, + ) + + def test_reprocess_ip_parent_prefixes_more_specific(self): + """Validate that reprocess_ip_parent_prefixes identifies a more specific prefix.""" + self.solarwinds.load_prefix(network="10.0.0.0/24") + self.solarwinds.load_prefix(network="10.0.0.0/25") + self.solarwinds.load_ipaddress(addr="10.0.0.1", prefix_length=24, prefix="10.0.0.0/24", addr_type="IPv4") + self.solarwinds.reprocess_ip_parent_prefixes() + self.job.debug = True + self.job.logger.debug.assert_called_once_with( + "More specific subnet %s found for IP %s/%s", "10.0.0.0/25", "10.0.0.1", 24 + ) + self.assertEqual( + {"10.0.0.1__10.0.0.0__25__Global"}, + {ipaddr.get_unique_id() for ipaddr in self.solarwinds.get_all("ipaddress")}, + ) + + def test_reprocess_ip_parent_prefixes_no_update(self): + """Validate that reprocess_ip_parent_prefixes does not update the ip.""" + self.solarwinds.load_prefix(network="10.0.0.0/24") + self.solarwinds.load_prefix(network="10.0.0.0/23") + self.solarwinds.load_ipaddress(addr="10.0.0.1", prefix_length=24, prefix="10.0.0.0/24", addr_type="IPv4") + self.solarwinds.reprocess_ip_parent_prefixes() + self.job.debug = True + self.job.logger.debug.assert_not_called() + self.assertEqual( + {"10.0.0.1__10.0.0.0__24__Global"}, + {ipaddr.get_unique_id() for ipaddr in self.solarwinds.get_all("ipaddress")}, + ) diff --git a/nautobot_ssot/tests/solarwinds/test_utils_solarwinds.py b/nautobot_ssot/tests/solarwinds/test_utils_solarwinds.py new file mode 100644 index 00000000..d23c5a46 --- /dev/null +++ b/nautobot_ssot/tests/solarwinds/test_utils_solarwinds.py @@ -0,0 +1,559 @@ +# pylint: disable=R0801 +"""Test Solarwinds utility functions and client.""" + +import uuid +from datetime import datetime +from unittest.mock import MagicMock, patch + +import requests +from nautobot.core.testing import TransactionTestCase +from nautobot.extras.models import JobResult +from parameterized import parameterized + +from nautobot_ssot.integrations.solarwinds.jobs import SolarwindsDataSource +from nautobot_ssot.integrations.solarwinds.utils.solarwinds import ( + determine_role_from_devicetype, + determine_role_from_hostname, +) +from nautobot_ssot.tests.solarwinds.conftest import create_solarwinds_client + + +class TestSolarwindsClientTestCase(TransactionTestCase): # pylint: disable=too-many-public-methods + """Test the SolarwindsClient class.""" + + databases = ("default", "job_logs") + + def setUp(self): + """Configure shared variables for tests.""" + self.job = SolarwindsDataSource() + self.job.job_result = JobResult.objects.create( + name=self.job.class_path, task_name="Fake task", user=None, id=uuid.uuid4() + ) + self.job.integration = MagicMock() + self.job.integration.extra_config = {"batch_size": 10} + self.job.logger.debug = MagicMock() + self.job.logger.error = MagicMock() + self.job.logger.info = MagicMock() + self.job.logger.warning = MagicMock() + self.test_client = create_solarwinds_client(job=self.job) + + self.test_nodes = [{"Name": "Router01", "MemberPrimaryID": 1}, {"Name": "Switch01", "MemberPrimaryID": 2}] + self.node_details = {1: {"NodeHostname": "Router01", "NodeID": 1}, 2: {"NodeHostname": "Switch01", "NodeID": 2}} + + def test_solarwinds_client_initialization(self): + """Validate the SolarwindsClient functionality.""" + self.assertEqual(self.test_client.url, "https://test.solarwinds.com:443/SolarWinds/InformationService/v3/Json/") + self.assertEqual(self.test_client.job, self.job) + self.assertEqual(self.test_client.batch_size, 10) + self.assertEqual(self.test_client.timeout, 60) + self.assertEqual(self.test_client.retries, 5) + + def test_query(self): + """Validate that query() works as expected.""" + mock_expected = MagicMock(spec=requests.Response) + mock_expected.status_code = 200 + mock_expected.json.return_value = {"results": {"1": {"Name": "HQ"}}} + self.test_client._req = MagicMock() # pylint: disable=protected-access + self.test_client._req.return_value = mock_expected # pylint: disable=protected-access + result = self.test_client.query(query="SELECT ContainerID FROM Orion.Container WHERE Name = 'HQ'") + self.test_client._req.assert_called_with( # pylint: disable=protected-access + "POST", "Query", {"query": "SELECT ContainerID FROM Orion.Container WHERE Name = 'HQ'", "parameters": {}} + ) + self.assertEqual(result, {"results": {"1": {"Name": "HQ"}}}) + + def test_json_serial(self): + """Validate the _json_serial() functionality.""" + test_datetime = datetime(2020, 1, 1, 12, 0, 0) + expected_serialized = "2020-01-01T12:00:00" + result = self.test_client._json_serial(test_datetime) # pylint: disable=protected-access + self.assertEqual(expected_serialized, result) + + @patch("nautobot_ssot.integrations.solarwinds.utils.solarwinds.requests.Session.request") + def test_successful_request(self, mock_request): + """Validate successful functionality of the _req() function.""" + mock_response = MagicMock(requests.Response) + mock_response.status_code = 200 + mock_request.return_value = mock_response + + response = self.test_client._req("GET", "test") # pylint: disable=protected-access + + self.assertEqual(response.status_code, 200) + mock_request.assert_called_once_with("GET", self.test_client.url + "test", data="null", timeout=60) + + @patch("nautobot_ssot.integrations.solarwinds.utils.solarwinds.requests.Session.request") + def test_request_with_data(self, mock_request): + """Validate successful functionality of the _req() function with data passed.""" + mock_response = MagicMock(requests.Response) + mock_response.status_code = 201 + mock_request.return_value = mock_response + + response = self.test_client._req("POST", "create", data={"key": "value"}) # pylint: disable=protected-access + + self.assertEqual(response.status_code, 201) + mock_request.assert_called_once_with( + "POST", self.test_client.url + "create", data='{"key": "value"}', timeout=60 + ) + + @patch("nautobot_ssot.integrations.solarwinds.utils.solarwinds.requests.Session.request") + def test_request_400_600_status_code(self, mock_request): + """Validate handling of _req() call when 4xx or 5xx status code returned.""" + mock_response = MagicMock(requests.Response) + mock_response.status_code = 401 + mock_response.text = '{"Message": "Unauthorized"}' + mock_request.return_value = mock_response + + response = self.test_client._req("GET", "unauthorized") # pylint: disable=protected-access + + self.assertEqual(response.status_code, 401) + self.assertEqual(response.reason, "Unauthorized") + self.assertIsInstance(response, requests.Response) + mock_request.assert_called_once_with("GET", self.test_client.url + "unauthorized", data="null", timeout=60) + + @patch("nautobot_ssot.integrations.solarwinds.utils.solarwinds.requests.Session.request") + def test_request_json_decoding_error_handling(self, mock_request): + """Validate handling of JSON decoding error in _req() call.""" + mock_response = MagicMock(requests.Response) + mock_response.status_code = 500 + mock_response.text = '{"key": "value"' + mock_request.return_value = mock_response + + response = self.test_client._req("GET", "decode_error") # pylint: disable=protected-access + + self.assertEqual(response.status_code, 500) + self.assertIsInstance(response, requests.Response) + mock_request.assert_called_once_with("GET", self.test_client.url + "decode_error", data="null", timeout=60) + + @patch("nautobot_ssot.integrations.solarwinds.utils.solarwinds.requests.Session.request") + def test_request_exception_handling(self, mock_request): + """Validate handling of Exception thrown in _req() call.""" + mock_request.side_effect = requests.exceptions.RequestException("Request timed out") + + response = self.test_client._req("GET", "timeout") # pylint: disable=protected-access + + self.job.logger.error.assert_called_with("An error occurred: Request timed out") + self.assertEqual(response.status_code, None) + self.assertIsInstance(response, requests.Response) + self.assertEqual(response.content, None) + mock_request.assert_called_once_with("GET", self.test_client.url + "timeout", data="null", timeout=60) + + def test_get_filtered_container_ids_success(self): + """Validate successful retrieval of container IDs with get_filtered_container_ids().""" + self.test_client.find_container_id_by_name = MagicMock() + self.test_client.find_container_id_by_name.side_effect = [1, 2] + + expected = {"DC01": 1, "DC02": 2} + result = self.test_client.get_filtered_container_ids(containers="DC01,DC02") + self.assertEqual(result, expected) + self.job.logger.error.assert_not_called() + + def test_get_filtered_container_ids_failure(self): + """Validate failed retrieval of container IDs with get_filtered_container_ids().""" + self.test_client.find_container_id_by_name = MagicMock() + self.test_client.find_container_id_by_name.return_value = -1 + + result = self.test_client.get_filtered_container_ids(containers="Failure") + self.job.logger.error.assert_called_once_with("Unable to find container Failure.") + self.assertEqual(result, {}) + + def test_get_container_nodes(self): + """Validate functionality of get_container_nodes().""" + container_ids = {"DC01": 1} + self.test_client.recurse_collect_container_nodes = MagicMock() + self.test_client.recurse_collect_container_nodes.return_value = [1, 2, 3] + result = self.test_client.get_container_nodes(container_ids=container_ids) + + self.job.logger.debug.assert_called_once_with("Gathering container nodes for DC01 CID: 1.") + self.test_client.recurse_collect_container_nodes.assert_called_once() + self.assertEqual(result, {"DC01": [1, 2, 3]}) + + def test_get_top_level_containers(self): + """Validate functionality of get_top_level_containers().""" + self.test_client.find_container_id_by_name = MagicMock() + self.test_client.find_container_id_by_name.return_value = 1 + self.test_client.query = MagicMock() + self.test_client.query.return_value = { + "results": [ + {"ContainerID": 1, "Name": "Test", "MemberPrimaryID": 10}, + {"ContainerID": 1, "Name": "Test2", "MemberPrimaryID": 11}, + ] + } + + result = self.test_client.get_top_level_containers(top_container="Top") + self.assertEqual(result, {"Test": 10, "Test2": 11}) + self.test_client.find_container_id_by_name.assert_called_once_with(container_name="Top") + + def test_recurse_collect_container_nodes(self): + """Validate functionality of recurse_collect_container_nodes() finding Orion.Nodes EntityType.""" + + self.test_client.query = MagicMock() + self.test_client.query.side_effect = [ + { + "results": [ + {"Name": "Room01", "MemberEntityType": "Orion.Groups", "MemberPrimaryID": 20}, + {"Name": "DistroSwitch01", "MemberEntityType": "Orion.Nodes", "MemberPrimaryID": 21}, + ] + }, + {"results": [{"Name": "Room01-Router", "MemberEntityType": "Orion.Nodes", "MemberPrimaryID": 30}]}, + ] + + result = self.test_client.recurse_collect_container_nodes(current_container_id=1) + + self.job.logger.debug.assert_called_once_with("Exploring container: Room01 CID: 20") + self.assertEqual( + result, + [ + {"Name": "Room01-Router", "MemberEntityType": "Orion.Nodes", "MemberPrimaryID": 30}, + {"Name": "DistroSwitch01", "MemberEntityType": "Orion.Nodes", "MemberPrimaryID": 21}, + ], + ) + + def test_find_container_id_by_name_success(self): + """Validate successful functionality of find_container_id_by_name() finding container ID by name.""" + self.test_client.query = MagicMock() + self.test_client.query.return_value = {"results": [{"ContainerID": 1}]} + results = self.test_client.find_container_id_by_name(container_name="Test") + self.assertEqual(results, 1) + self.test_client.query.assert_called_once_with("SELECT ContainerID FROM Orion.Container WHERE Name = 'Test'") + + def test_find_container_id_by_name_failure(self): + """Validate failure functionality of find_container_id_by_name() finding container ID by name.""" + self.test_client.query = MagicMock() + self.test_client.query.return_value = {"results": []} + results = self.test_client.find_container_id_by_name(container_name="Test") + self.assertEqual(results, -1) + + def test_build_node_details(self): + """Validate functionality of build_node_details().""" + self.test_client.batch_fill_node_details = MagicMock() + self.test_client.get_node_prefix_length = MagicMock() + self.test_client.gather_interface_data = MagicMock() + self.test_client.gather_ipaddress_data = MagicMock() + result = self.test_client.build_node_details(nodes=self.test_nodes) + + self.test_client.batch_fill_node_details.assert_called_once_with( + node_data=self.test_nodes, node_details=self.node_details, nodes_per_batch=10 + ) + self.test_client.get_node_prefix_length.assert_called_once_with( + node_data=self.test_nodes, node_details=self.node_details, nodes_per_batch=10 + ) + self.job.logger.info.assert_called_once_with("Loading interface details for nodes.") + self.test_client.gather_interface_data.assert_called_once_with( + node_data=self.test_nodes, node_details=self.node_details, nodes_per_batch=10 + ) + self.test_client.gather_ipaddress_data.assert_called_once_with( + node_data=self.test_nodes, node_details=self.node_details, nodes_per_batch=10 + ) + self.assertEqual(result, self.node_details) + + def test_batch_fill_node_details_success(self): + """Validate successful functionality of batch_fill_node_details() to fill in node details.""" + self.test_client.query = MagicMock() + self.test_client.query.return_value = { + "results": [ + { + "NodeID": 1, + "Version": "v1", + "IPAddress": "192.168.1.1", + "SNMPLocation": "", + "Vendor": "Cisco", + "DeviceType": "Cisco Catalyst 3560-G24TS", + "Model": "WS-C3560G-24TS-S", + "ServiceTag": "", + } + ] + } + self.test_client.batch_fill_node_details( + node_data=self.test_nodes, node_details=self.node_details, nodes_per_batch=10 + ) + self.job.logger.debug.assert_called_once_with("Processing batch 1 of 1 - Orion.Nodes.") + self.test_client.query.assert_called_once_with( + "\n SELECT IOSVersion AS Version,\n o.IPAddress,\n Location AS SNMPLocation,\n o.Vendor,\n MachineType AS DeviceType,\n h.Model,\n h.ServiceTag,\n o.NodeID\n FROM Orion.Nodes o LEFT JOIN Orion.HardwareHealth.HardwareInfo h ON o.NodeID = h.NodeID\n WHERE NodeID IN (\n '1','2')" + ) + self.assertEqual( + self.node_details, + { + 1: { + "NodeHostname": "Router01", + "NodeID": 1, + "Version": "v1", + "IPAddress": "192.168.1.1", + "SNMPLocation": "", + "Vendor": "Cisco", + "DeviceType": "Cisco Catalyst 3560-G24TS", + "Model": "WS-C3560G-24TS-S", + "ServiceTag": "", + "PFLength": 32, + }, + 2: {"NodeHostname": "Switch01", "NodeID": 2}, + }, + ) + + def test_batch_fill_node_details_failure(self): + """Validate functionality of batch_fill_node_details() when no information is returned.""" + self.test_client.query = MagicMock() + self.test_client.query.return_value = {"results": []} + self.test_client.batch_fill_node_details( + node_data=self.test_nodes, node_details=self.node_details, nodes_per_batch=10 + ) + self.job.logger.error.assert_called_once_with("Error: No node details found for the batch of nodes") + + def test_get_node_prefix_length_success(self): + """Validate functionality of get_node_prefix_length() when data returned.""" + self.test_client.query = MagicMock() + self.test_client.query.return_value = {"results": [{"NodeID": 1, "PFLength": 32}]} + self.test_client.get_node_prefix_length( + node_data=self.test_nodes, node_details=self.node_details, nodes_per_batch=10 + ) + self.job.logger.debug.assert_called_once_with("Processing batch 1 of 1 - IPAM.IPInfo.") + self.test_client.query.assert_called_once_with( + "SELECT i.CIDR AS PFLength, o.NodeID FROM Orion.Nodes o JOIN IPAM.IPInfo i ON o.IPAddressGUID = i.IPAddressN WHERE o.NodeID IN ('1','2')" + ) + self.assertEqual( + self.node_details, + { + 1: { + "NodeHostname": "Router01", + "NodeID": 1, + "PFLength": 32, + }, + 2: {"NodeHostname": "Switch01", "NodeID": 2}, + }, + ) + + def test_get_node_prefix_length_failure(self): + """Validate functionality of get_node_prefix_length() when no information is returned.""" + self.test_client.query = MagicMock() + self.test_client.query.return_value = {"results": []} + self.test_client.get_node_prefix_length( + node_data=self.test_nodes, node_details=self.node_details, nodes_per_batch=10 + ) + self.job.logger.error.assert_called_once_with("Error: No node details found for the batch of nodes") + + def test_gather_interface_data_success(self): + """Validate functionality of gather_interface_data() when data is returned.""" + self.test_client.query = MagicMock() + self.test_client.query.return_value = { + "results": [ + { + "NodeID": 1, + "Name": "TenGigabitEthernet0/0/0", + "Enabled": "Up", + "Status": "Up", + "TypeName": "ethernetCsmacd", + "Speed": 10000000000.0, + "MAC": "DE68F1A6C467", + "MTU": 1500, + }, + { + "NodeID": 1, + "Name": "TenGigabitEthernet0/0/1", + "Enabled": "Up", + "Status": "Up", + "TypeName": "ethernetCsmacd", + "Speed": 10000000000.0, + "MAC": "DE68F1A6C468", + "MTU": 1500, + }, + ] + } + expected = { + 1: { + "NodeHostname": "Router01", + "NodeID": 1, + "interfaces": { + "TenGigabitEthernet0/0/0": { + "Name": "TenGigabitEthernet0/0/0", + "Enabled": "Up", + "Status": "Up", + "TypeName": "ethernetCsmacd", + "Speed": 10000000000.0, + "MAC": "DE68F1A6C467", + "MTU": 1500, + }, + "TenGigabitEthernet0/0/1": { + "Name": "TenGigabitEthernet0/0/1", + "Enabled": "Up", + "Status": "Up", + "TypeName": "ethernetCsmacd", + "Speed": 10000000000.0, + "MAC": "DE68F1A6C468", + "MTU": 1500, + }, + }, + }, + 2: {"NodeHostname": "Switch01", "NodeID": 2}, + } + self.test_client.gather_interface_data( + node_data=self.test_nodes, node_details=self.node_details, nodes_per_batch=10 + ) + self.test_client.query.assert_called_once_with( + "\n SELECT n.NodeID,\n sa.StatusName AS Enabled,\n so.StatusName AS Status,\n i.Name,\n i.MAC,\n i.Speed,\n i.TypeName,\n i.MTU\n FROM Orion.Nodes n JOIN Orion.NPM.Interfaces i ON n.NodeID = i.NodeID INNER JOIN Orion.StatusInfo sa ON i.AdminStatus = sa.StatusId INNER JOIN Orion.StatusInfo so ON i.OperStatus = so.StatusId\n WHERE n.NodeID IN (\n '1','2')" + ) + self.assertEqual(self.node_details, expected) + + def test_gather_interface_data_failure(self): + """Validate functionality of gather_interface_data() when no information is returned.""" + self.test_client.query = MagicMock() + self.test_client.query.return_value = {"results": []} + self.test_client.gather_interface_data( + node_data=self.test_nodes, node_details=self.node_details, nodes_per_batch=10 + ) + self.job.logger.error.assert_called_once_with("Error: No node details found for the batch of nodes") + + node_types = [ + ( + "catalyst", + { + "Vendor": "Cisco", + "DeviceType": "Catalyst 9500-48Y4C", + "Model": "C9500-48Y4C", + }, + "WS-C9500-48Y4C", + ), + ( + "blank_model", + {"Vendor": "Cisco", "DeviceType": "Cisco Catalyst 3560CG-8PC-S", "Model": ""}, + "WS-C3560CG-8PC-S", + ), + ( + "space_model", + {"Vendor": "Cisco", "DeviceType": "Cisco Catalyst 4500X-32 SFP+ Switch", "Model": " "}, + "WS-C4500X-32 SFP+ Switch", + ), + ("both_blank", {"Vendor": "Cisco", "DeviceType": "", "Model": ""}, ""), + ] + + @parameterized.expand(node_types, skip_on_empty=True) + def test_standardize_device_type(self, name, sent, received): # pylint: disable=unused-argument + """Validate functionality of standardize_device_type().""" + result = self.test_client.standardize_device_type(node=sent) + self.assertEqual(result, received) + + intf_types = [ + ("standard_tengig", {"TypeName": "ethernetCsmacd", "Name": "TenGigabitEthernet0/0/0"}, "10gbase-t"), + ("ethernet_speed", {"TypeName": "ethernetCsmacd", "Name": "Ethernet0/0", "Speed": 100000000.0}, "100base-tx"), + ("virtual", {"TypeName": "propVirtual", "Name": "PortChannel10"}, "virtual"), + ] + + @parameterized.expand(intf_types, skip_on_empty=True) + def test_determine_interface_type(self, name, sent, received): # pylint: disable=unused-argument + """Validate functionality of determine_interface_type().""" + result = self.test_client.determine_interface_type(interface=sent) + self.assertEqual(result, received) + + def test_determine_interface_type_failure(self): + """Validate functionality of determine_interface_type() when can't determine type.""" + test_intf = {"TypeName": "ethernetCsmacd", "Name": "Management", "Speed": 1.0} + result = self.test_client.determine_interface_type(interface=test_intf) + self.assertEqual(result, "virtual") + self.job.logger.debug.assert_called_once_with("Unable to find Ethernet interface in map: Management") + + test_versions = [ + ("release_software", "17.6.5, RELEASE SOFTWARE (fc2)", "17.6.5"), + ("copyright_software", "4.2(2f), Copyright (c) 2008-2022, Cisco Systems, Inc.", "4.2(2f)"), + ("release_no_comma", "03.11.01.E RELEASE SOFTWARE (fc4)", "03.11.01.E"), + ("copyright_no_comma", "4.0(4b) Copyright (c) 2008-2019, Cisco Systems, Inc.", "4.0(4b)"), + ] + + @parameterized.expand(test_versions, skip_on_empty=True) + def test_extract_version(self, name, sent, received): # pylint: disable=unused-argument + """Validate functionality of the extract_version() method.""" + result = self.test_client.extract_version(version=sent) + self.assertEqual(result, received) + + def test_gather_ipaddress_data_success(self): + """Validate functionality of gather_ipaddress_data() when data is returned.""" + self.test_client.query = MagicMock() + self.test_client.query.return_value = { + "results": [ + { + "NodeID": 1, + "IPAddress": "10.0.0.1", + "IPAddressType": "IPv4", + "Name": "Ethernet0/1", + "SubnetMask": "", + }, + { + "NodeID": 1, + "IPAddress": "2001:db8:::", + "IPAddressType": "IPv6", + "Name": "Ethernet0/2", + "SubnetMask": 32, + }, + { + "NodeID": 2, + "IPAddress": "192.168.0.1", + "IPAddressType": "IPv4", + "Name": "GigabitEthernet0/1", + "SubnetMask": "255.255.255.0", + }, + ] + } + expected = { + 1: { + "NodeHostname": "Router01", + "NodeID": 1, + "ipaddrs": { + "10.0.0.1": { + "IPAddress": "10.0.0.1", + "SubnetMask": 32, + "IPAddressType": "IPv4", + "IntfName": "Ethernet0/1", + }, + "2001:db8:::": { + "IPAddress": "2001:db8:::", + "SubnetMask": 128, + "IPAddressType": "IPv6", + "IntfName": "Ethernet0/2", + }, + }, + }, + 2: { + "NodeHostname": "Switch01", + "NodeID": 2, + "ipaddrs": { + "192.168.0.1": { + "IPAddress": "192.168.0.1", + "SubnetMask": 24, + "IPAddressType": "IPv4", + "IntfName": "GigabitEthernet0/1", + } + }, + }, + } + self.test_client.gather_ipaddress_data( + node_data=self.test_nodes, node_details=self.node_details, nodes_per_batch=10 + ) + self.test_client.query.assert_called_once_with( + "\n SELECT NIPA.NodeID,\n NIPA.InterfaceIndex,\n NIPA.IPAddress,\n NIPA.IPAddressType,\n NPMI.Name,\n NIPA.SubnetMask\n FROM Orion.NodeIPAddresses NIPA INNER JOIN Orion.NPM.Interfaces NPMI ON NIPA.NodeID=NPMI.NodeID AND NIPA.InterfaceIndex=NPMI.InterfaceIndex INNER JOIN Orion.Nodes N ON NIPA.NodeID=N.NodeID\n WHERE NIPA.NodeID IN (\n '1','2')" + ) + self.assertEqual(self.node_details, expected) + + def test_gather_ipaddress_data_failure(self): + """Validate functionality of gather_ipaddress_data() when no information is returned.""" + self.test_client.query = MagicMock() + self.test_client.query.return_value = {"results": []} + self.test_client.gather_ipaddress_data( + node_data=self.test_nodes, node_details=self.node_details, nodes_per_batch=10 + ) + self.job.logger.error.assert_called_once_with("Error: No node details found for the batch of nodes") + + def test_determine_role_from_devicetype_success(self): + """Validate successful functionality of determine_role_from_devicetype().""" + result = determine_role_from_devicetype(device_type="ASR1001", role_map={"ASR1001": "Router"}) + self.assertEqual(result, "Router") + + def test_determine_role_from_devicetype_failure(self): + """Validate functionality of determine_role_from_devicetype() when match isn't found.""" + result = determine_role_from_devicetype(device_type="Cat3k", role_map={"ASR1001": "Router"}) + self.assertEqual(result, "") + + def test_determine_role_from_hostname_success(self): + """Validate successful functionality of determine_role_from_hostname().""" + result = determine_role_from_hostname(hostname="core-router.test.com", role_map={".*router.*": "Router"}) + self.assertEqual(result, "Router") + + def test_determine_role_from_hostname_failure(self): + """Validate functionality of determine_role_from_hostname() when match not found.""" + result = determine_role_from_hostname(hostname="distro-switch.test.com", role_map={".*router.*": "Router"}) + self.assertEqual(result, "") diff --git a/poetry.lock b/poetry.lock index c7cbc1cd..6b8361c2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -3645,6 +3645,20 @@ files = [ [package.extras] dev = ["black", "mypy", "pytest"] +[[package]] +name = "orionsdk" +version = "0.4.0" +description = "Python API for the SolarWinds Orion SDK" +optional = true +python-versions = "*" +files = [ + {file = "orionsdk-0.4.0.tar.gz", hash = "sha256:129ab44f15ee5c4d881715398854410e26efe18b1e5c59e6962231d793091165"}, +] + +[package.dependencies] +requests = "*" +six = "*" + [[package]] name = "packaging" version = "23.2" @@ -5911,7 +5925,7 @@ type = ["pytest-mypy"] [extras] aci = ["PyYAML"] -all = ["Jinja2", "PyYAML", "cloudvision", "cvprac", "dnacentersdk", "dnspython", "ijson", "ipfabric", "meraki", "nautobot-device-lifecycle-mgmt", "netutils", "oauthlib", "python-magic", "pytz", "requests", "requests-oauthlib", "six", "slurpit-sdk", "urllib3"] +all = ["Jinja2", "PyYAML", "cloudvision", "cvprac", "dnacentersdk", "dnspython", "ijson", "ipfabric", "meraki", "nautobot-device-lifecycle-mgmt", "netutils", "oauthlib", "orionsdk", "python-magic", "pytz", "requests", "requests-oauthlib", "six", "slurpit-sdk", "urllib3"] aristacv = ["cloudvision", "cvprac"] bootstrap = ["pytz"] citrix-adm = ["netutils", "requests", "urllib3"] @@ -5924,8 +5938,9 @@ nautobot-device-lifecycle-mgmt = ["nautobot-device-lifecycle-mgmt"] pysnow = ["ijson", "oauthlib", "python-magic", "pytz", "requests", "requests-oauthlib", "six"] servicenow = ["Jinja2", "PyYAML", "ijson", "oauthlib", "python-magic", "pytz", "requests", "requests-oauthlib", "six"] slurpit = ["slurpit-sdk"] +solarwinds = ["orionsdk"] [metadata] lock-version = "2.0" python-versions = ">=3.8,<3.13" -content-hash = "686a1e20a00ecbef1ac2e16c1ef9e295a3cc164ef0eabb5ee1507ad01c10b27c" +content-hash = "d7628d290402205c1846cedcf57baa7ae765d3a8acb672161e33bb4ac298c2b3" diff --git a/pyproject.toml b/pyproject.toml index 16dec565..03728d49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,7 @@ retry = "^0.9.2" dnacentersdk = { version = "^2.5.6", optional = true } meraki = { version = "^1.37.2,<1.46.0", optional = true } slurpit-sdk = { version = "^0.9.58", optional = true } +orionsdk = { version = "^0.4.0", optional = true } [tool.poetry.group.dev.dependencies] coverage = "*" @@ -115,6 +116,7 @@ all = [ "nautobot-device-lifecycle-mgmt", "netutils", "oauthlib", + "orionsdk", "python-magic", "pytz", "requests", @@ -158,6 +160,9 @@ meraki = [ slurpit = [ "slurpit_sdk", ] +solarwinds = [ + "orionsdk", +] # pysnow = "^0.7.17" # PySNow is currently pinned to an older version of pytz as a dependency, which blocks compatibility with newer # versions of Nautobot. See https://github.com/rbw/pysnow/pull/186