Skip to content

Commit

Permalink
Updated support for get_snmp_information, get_ntp_servers, get_config…
Browse files Browse the repository at this point in the history
…, and ping, as well as the related helper functions.

Updated supportability in the README.md to reflect these changes.
  • Loading branch information
alvinc13 committed Oct 2, 2020
1 parent dc5084a commit 7a00d34
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 18 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ may have not been properly tested in older versions.

## Current Support Functionality
* get_arp_table - Get the ARP table from a device.
* get_bgp_neighbors - Get the BGP Neighbors from a device.
* get_bgp_neighbors_detail - Get a detailed BGP neighbor from a device.
* get_config - Get configuration from the device.
* get_facts - Get the version, serial number, vendor, model, and uptime from a device.
* get_interfaces - Get list of interfaces from a device.
Expand Down
265 changes: 250 additions & 15 deletions napalm_aoscx/aoscx.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
transform_lldp_capab,
textfsm_extractor,
)
import napalm.base.constants as C
import napalm.base.constants as c

# Aruba AOS-CX lib
import pyaoscx
Expand All @@ -67,6 +67,7 @@ def __init__(self, hostname, username, password, timeout=60, optional_args=None)
self.profile = [self.platform]
self.session_info = {}
self.isAlive = False
self.candidate_config = ''

self.base_url = "https://{0}/rest/v1/".format(self.hostname)

Expand Down Expand Up @@ -540,27 +541,205 @@ def get_mac_address_table(self):
)
return mac_entries

def get_snmp_information(self):
"""
Implementation of NAPALM method get_snmp_information. This returns a dict of dicts containing SNMP
configuration.
:return: Returns a lists of dictionaries. Each inner dictionary contains these fields:
* chassis_id (string)
* community (dictionary with community string specific information)
* acl (string) # acl number or name (Unsupported)
* mode (string) # read-write (rw), read-only (ro) (Unsupported)
* contact (string)
* location (string)
"""
snmp_dict = {
"chassis_id": "",
"community": {},
"contact": "",
"location": ""
}

systeminfo = pyaoscx.system.get_system_info(**self.session_info)
productinfo = pyaoscx.system.get_product_info(**self.session_info)

communities_dict = {}
for community_name in systeminfo['snmp_communities']:
communities_dict[community_name] = {
'acl': '',
'mode': ''
}

snmp_dict['chassis_id'] = productinfo['product_info']['serial_number']
snmp_dict['community'] = communities_dict
snmp_dict['contact'] = systeminfo['other_config']['system_contact']
snmp_dict['location'] = systeminfo['other_config']['system_location']

return snmp_dict

def get_ntp_servers(self):
"""
Implementation of NAPALM method get_ntp_servers. Returns the NTP servers configuration as dictionary.
The keys of the dictionary represent the IP Addresses of the servers.
Note: Inner dictionaries do not have yet any available keys.
:return: A dictionary with keys that are the NTP associations.
"""
return self._get_ntp_associations(**self.session_info)

def get_config(self, retrieve="all", full=False):
"""
Return the configuration of a device.
Return the configuration of a device. Currently this is limited to JSON format
Args:
retrieve(string): Which configuration type you want to populate, default is all of them.
:param retrieve: String to determine which configuration type you want to retrieve, default is all of them.
The rest will be set to "".
full(bool): Retrieve all the configuration. For instance, on ios, "sh run all".
:param full: Boolean to retrieve all the configuration. (Not supported)
:return: The object returned is a dictionary with a key for each configuration store:
- running(string) - Representation of the native running configuration
- candidate(string) - Representation of the candidate configuration.
- startup(string) - Representation of the native startup configuration.
"""
if retrieve not in ["running", "candidate", "startup", "all"]:
raise Exception("ERROR: Not a valid option to retrieve.\nPlease select from 'running', 'candidate', "
"'startup', or 'all'")
else:
config_dict = {
"running": "",
"startup": "",
"candidate": ""
}
if retrieve in ["running", "all"]:
config_dict['running'] = self._get_json_configuration("running-config")
if retrieve in ["startup", "all"]:
config_dict['startup'] = self._get_json_configuration("startup-config")
if retrieve in ["candidate", "all"]:
config_dict['candidate'] = self.candidate_config

return config_dict


def ping(self, destination, source=c.PING_SOURCE, ttl=c.PING_TTL, timeout=c.PING_TIMEOUT, size=c.PING_SIZE,
count=c.PING_COUNT, vrf=c.PING_VRF):
"""
Executes ping on the device and returns a dictionary with the result. Currently only IPv4 is supported.
:param destination: Host or IP Address of the destination
:param source (optional): Source address of echo request (Not Supported)
:param ttl (optional): Maximum number of hops (Not Supported)
:param timeout (optional): Maximum seconds to wait after sending final packet
:param size (optional): Size of request (bytes)
:param count (optional): Number of ping request to send
:return: Output dictionary that has one of following keys:
* error
* success - In case of success, inner dictionary will have the followin keys:
* probes_sent (int)
* packet_loss (int)
* rtt_min (float)
* rtt_max (float)
* rtt_avg (float)
* rtt_stddev (float)
* results (list)
* ip_address (str)
* rtt (float)
"""
ping_results = self._ping_destination(destination, is_ipv4=True, data_size=size, time_out=timeout,
interval=2, reps=count, time_stamp=False, record_route=False,
vrf=vrf, **self.session_info)

full_results = ping_results['statistics']
transmitted = 0
loss = 0
rtt_min = 0.0
rtt_avg = 0.0
rtt_max = 0.0
rtt_mdev = 0.0

lines = full_results.split('\n')
for count, line in enumerate(lines):
cell = line.split(' ')
if count == 1:
transmitted = cell[0]
loss = cell[5]
loss = int(loss[:-1]) #Shave off the %
if count == 2:
numbers = cell[3].split('/')
rtt_min = numbers[0]
rtt_avg = numbers[1]
rtt_max = numbers[2]
rtt_mdev = numbers[3]

output_dict = {}
results_list = []
if loss < 100:
results_list.append(
{
'ip_address': destination,
'rtt': rtt_avg
}
)

Returns:
The object returned is a dictionary with a key for each configuration store:
output_dict['success'] = {
'probes_sent': transmitted,
'packet_loss': loss,
'rtt_min': rtt_min,
'rtt_max': rtt_max,
'rtt_avg': rtt_avg,
'rtt_stddev': rtt_mdev,
'results': results_list
}
else:
output_dict['error'] = 'unknown host {}'.format(destination)

- running(string) - Representation of the native running configuration
- candidate(string) - Representation of the native candidate configuration. If the
device doesnt differentiate between running and startup configuration this will an
empty string
- startup(string) - Representation of the native startup configuration. If the
device doesnt differentiate between running and startup configuration this will an
empty string
return output_dict

def _ping_destination(self, ping_target, is_ipv4=True, data_size=100, time_out=2, interval=2,
reps=5, time_stamp=False, record_route=False, vrf="default", **kwargs):
"""
Perform a Ping command to a specified destination
:param ping_target: Destination address as a string
:param is_ipv4: Boolean True if the destination is an IPv4 address
:param data_size: Integer for packet size in bytes
:param time_out: Integer for timeout value
:param interval: Integer for time between packets in seconds
:param reps: Integer for the number of signals sent in repetition
:param time_stamp: Boolean True if the time stamp should be included in the results
:param record_route: Boolean True if the route taken should be recorded in the results
:param vrf: String of the VRF name that the ping should be sent. If using the Management VRF, set this to mgmt
:param kwargs:
keyword s: requests.session object with loaded cookie jar
keyword url: URL in main() function
:return: Dictionary containing fan information
"""
raise NotImplementedError

target_url = kwargs["url"] + "ping?"
print(str(ping_target))
if not ping_target:
raise Exception("ERROR: No valid ping target set")
else:
target_url += 'ping_target={}&'.format(str(ping_target))
target_url += 'is_ipv4={}&'.format(str(is_ipv4))
target_url += 'data_size={}&'.format(str(data_size))
target_url += 'ping_time_out={}&'.format(str(time_out))
target_url += 'ping_interval={}&'.format(str(interval))
target_url += 'ping_repetitions={}&'.format(str(reps))
target_url += 'include_time_stamp={}&'.format(str(time_stamp))
target_url += 'record_route={}&'.format(str(record_route))
if vrf == 'mgmt':
target_url += 'mgmt=true'
else:
target_url += 'mgmt=false'

response = kwargs["s"].get(target_url, verify=False)

if not common_ops._response_ok(response, "GET"):
logging.warning("FAIL: Ping failed with status code %d: %s"
% (response.status_code, response.text))
ping_dict = {}
else:
logging.info("SUCCESS: Ping succeeded")
ping_dict = response.json()

return ping_dict

def _get_fan_info(self, params={}, **kwargs):
"""
Expand Down Expand Up @@ -665,3 +844,59 @@ def _get_resource_utilization(self, params={}, **kwargs):
resources_dict = response.json()

return resources_dict

def _get_ntp_associations(self, params={}, **kwargs):
"""
Perform a GET call to get the NTP associations across all VRFs
:param params: Dictionary of optional parameters for the GET request
:param kwargs:
keyword s: requests.session object with loaded cookie jar
keyword url: URL in main() function
:return: Dictionary containing all of the NTP associations on the switch
"""

target_url = kwargs["url"] + "system/vrfs/*/ntp_associations"

response = kwargs["s"].get(target_url, params=params, verify=False)

associations_dict = {}
for server_uri in response:
server_name = server_uri[(server_uri.rfind('/') + 1):] # Takes string after last '/'
associations_dict[server_name] = {}

if not common_ops._response_ok(response, "GET"):
logging.warning("FAIL: Getting dictionary of resource utilization info failed with status code %d: %s"
% (response.status_code, response.text))
associations_dict = {}
else:
logging.info("SUCCESS: Getting dictionary of resource utilization information succeeded")
associations_dict = response.json()

return associations_dict

def _get_json_configuration(self, checkpoint="running-config", params={}, **kwargs):
"""
Perform a GET call to retrieve a configuration file based off of the checkpoint name
:param checkpoint: String name of the checkpoint configuration
:param params: Dictionary of optional parameters for the GET request
:param kwargs:
keyword s: requests.session object with loaded cookie jar
keyword url: URL in main() function
:return: JSON format of the configuration
"""

target_url = kwargs["url"] + "fullconfigs/{}".format(checkpoint)

response = kwargs["s"].get(target_url, params=params, verify=False)

if not common_ops._response_ok(response, "GET"):
logging.warning("FAIL: Getting configuration checkpoint named %s failed with status code %d: %s"
% (checkpoint, response.status_code, response.text))
configuration_json = {}
else:
logging.info("SUCCESS: Getting configuration checkpoint named %s succeeded" % checkpoint)
configuration_json = response.json()

return configuration_json
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="napalm-aruba-cx",
version="0.1.2",
version="0.1.3",
author="Aruba Automation",
author_email="[email protected]",
description="NAPALM drivers for Aruba AOS-CX Switches",
Expand Down

0 comments on commit 7a00d34

Please sign in to comment.