Skip to content

Commit

Permalink
Use wait_until function for retry
Browse files Browse the repository at this point in the history
  • Loading branch information
smagarwal-arista committed Nov 6, 2024
1 parent 8b85062 commit c52c7a7
Showing 1 changed file with 53 additions and 45 deletions.
98 changes: 53 additions & 45 deletions tests/platform_tests/api/test_psu.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from tests.common.utilities import skip_release
from tests.platform_tests.cli.util import get_skip_mod_list
from .platform_api_test_base import PlatformApiTestBase
from tests.common.utilities import skip_release_for_platform
from tests.common.utilities import skip_release_for_platform, wait_until


###################################################
Expand All @@ -32,7 +32,6 @@
STATUS_LED_COLOR_AMBER = "amber"
STATUS_LED_COLOR_RED = "red"
STATUS_LED_COLOR_OFF = "off"
MAX_ATTEMPTS = 3


class TestPsuApi(PlatformApiTestBase):
Expand Down Expand Up @@ -215,51 +214,60 @@ def test_power(self, duthosts, enum_rand_one_per_hwsku_hostname, localhost, plat
''' PSU power test '''
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
skip_release_for_platform(duthost, ["202012", "201911", "201811"], ["arista"])
voltage = current = power = None
psu_info = {
"duthost": duthost,
"api": platform_api_conn,
"psu_id": None
}

def check_psu_power(failure_count):
nonlocal voltage
nonlocal current
nonlocal power
voltage = self.get_psu_parameter(psu_info, "voltage", psu.get_voltage, "voltage")
current = self.get_psu_parameter(psu_info, "current", psu.get_current, "current")
power = self.get_psu_parameter(psu_info, "power", psu.get_power, "power")

failure_occured = self.get_len_failed_expectations() > failure_count
if current and voltage and power:
is_within_tolerance = abs(power - (voltage*current)) < power*0.1
if not failure_occured and not is_within_tolerance:
return False

self.expect(is_within_tolerance, "PSU {} reading does not make sense \
(power:{}, voltage:{}, current:{})".format(psu_id, power, voltage, current))

return True

for psu_id in range(self.num_psus):
failure_count = self.get_len_failed_expectations()
failure_occured = False
psu_info = {
"duthost": duthost,
"api": platform_api_conn,
"psu_id": psu_id
}
for i in range(0, MAX_ATTEMPTS):
name = psu.get_name(platform_api_conn, psu_id)
if name in self.psu_skip_list:
logger.info("skipping check for {}".format(name))
else:
voltage = self.get_psu_parameter(psu_info, "voltage", psu.get_voltage, "voltage")
current = self.get_psu_parameter(psu_info, "current", psu.get_current, "current")
power = self.get_psu_parameter(psu_info, "power", psu.get_power, "power")

failure_occured = self.get_len_failed_expectations() > failure_count
if current is not None and voltage is not None and power is not None:
is_within_tolerance = abs(power - (voltage*current)) < power*0.1
if not failure_occured and not is_within_tolerance and i < MAX_ATTEMPTS - 1:
logger.info("Retrying test for {} as readings not within tolerance level".format(name))
continue
self.expect(is_within_tolerance, "PSU {} reading does not make sense \
(power:{}, voltage:{}, current:{})".format(psu_id, power, voltage, current))

self.get_psu_parameter(psu_info, "max_power", psu.get_maximum_supplied_power,
"maximum supplied power")

powergood_status = psu.get_powergood_status(platform_api_conn, psu_id)
if self.expect(powergood_status is not None,
"Failed to retrieve operational status of PSU {}".format(psu_id)):
self.expect(powergood_status is True, "PSU {} is not operational".format(psu_id))

high_threshold = self.get_psu_parameter(psu_info, "voltage_high_threshold",
psu.get_voltage_high_threshold, "high voltage threshold")
low_threshold = self.get_psu_parameter(psu_info, "voltage_low_threshold",
psu.get_voltage_low_threshold, "low voltage threshold")

if high_threshold is not None and low_threshold is not None:
self.expect(voltage < high_threshold and voltage > low_threshold,
"Voltage {} of PSU {} is not in between {} and {}"
.format(voltage, psu_id, low_threshold, high_threshold))

break
psu_info['psu_id'] = psu_id
name = psu.get_name(platform_api_conn, psu_id)
if name in self.psu_skip_list:
logger.info("skipping check for {}".format(name))
else:
check_result = wait_until(30, 10, 0, check_psu_power, failure_count)
self.expect(check_result, "PSU {} reading does not make sense \
(power:{}, voltage:{}, current:{})".format(psu_id, power, voltage, current))

self.get_psu_parameter(psu_info, "max_power", psu.get_maximum_supplied_power,
"maximum supplied power")

powergood_status = psu.get_powergood_status(platform_api_conn, psu_id)
if self.expect(powergood_status is not None,
"Failed to retrieve operational status of PSU {}".format(psu_id)):
self.expect(powergood_status is True, "PSU {} is not operational".format(psu_id))

high_threshold = self.get_psu_parameter(psu_info, "voltage_high_threshold",
psu.get_voltage_high_threshold, "high voltage threshold")
low_threshold = self.get_psu_parameter(psu_info, "voltage_low_threshold",
psu.get_voltage_low_threshold, "low voltage threshold")

if high_threshold and low_threshold:
self.expect(voltage < high_threshold and voltage > low_threshold,
"Voltage {} of PSU {} is not in between {} and {}"
.format(voltage, psu_id, low_threshold, high_threshold))

self.assert_expectations()

Expand Down

0 comments on commit c52c7a7

Please sign in to comment.