Skip to content

Commit

Permalink
Implement SMI detection using the standard deviation
Browse files Browse the repository at this point in the history
Signed-off-by: Carles Pey <[email protected]>
  • Loading branch information
cpey committed Nov 17, 2024
1 parent 528e48a commit eac7a23
Showing 1 changed file with 53 additions and 12 deletions.
65 changes: 53 additions & 12 deletions chipsec/modules/tools/smm/smm_ptr.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import os
import sys
import time
import math

from chipsec.module_common import BaseModule
from chipsec.library.returncode import ModuleResult
Expand Down Expand Up @@ -145,12 +146,12 @@
# very obscure option, don't even try to understand
GPR_2ADDR = False

# Defines the time percentage increase at which the SMI call is considered to
# be long-running
OUTLIER_THRESHOLD = 33.3
# Defines the threshold in standard deviations at which the SMI call is
# considered long-running
OUTLIER_STD_DEV = 2

# Scan mode delay before retry
SCAN_MODE_RETRY_DELAY = 0.01
# Number of samples used for initial calibration
SCAN_CALIB_SAMPLES = 50

# SMI count MSR
MSR_SMI_COUNT = 0x00000034
Expand Down Expand Up @@ -207,6 +208,12 @@ def __init__(self):
self.helper = OsHelper().get_default_helper()
self.helper.init()
self.smi_count = self.get_smi_count()
self.needs_calibration = True
self.calib_samples = 0
self.stdev = 0
self.m2 = 0
self.stdev_hist = 0
self.m2_hist = 0

def __del__(self):
self.helper.close()
Expand Down Expand Up @@ -254,6 +261,10 @@ def clear(self):
self.outliers = 0
self.code = None
self.confirmed = False
self.needs_calibration = True
self.calib_samples = 0
self.stdev = 0
self.m2 = 0

def add(self, duration, code, data, gprs, confirmed=False):
if not self.code:
Expand All @@ -262,6 +273,7 @@ def add(self, duration, code, data, gprs, confirmed=False):
if not outlier:
self.acc_smi_duration += duration
self.acc_smi_num += 1
self.update_stdev(duration)
if duration > self.max.duration:
self.max.update(duration, code, data, gprs.copy())
elif duration < self.min.duration:
Expand All @@ -281,24 +293,49 @@ def avg(self):
self.acc_smi_duration = 0
self.acc_smi_num = 0

#
# Computes the standard deviation using the Welford's online algorithm
#
def update_stdev(self, value):
difference = value - self.avg_smi_duration
self.difference_hist = value - self.hist_smi_duration
self.avg()
self.m2 += difference * (value - self.avg_smi_duration)
self.m2_hist += self.difference_hist * (value - self.hist_smi_duration)
variance = self.m2 / self.avg_smi_num
variance_hist = self.m2_hist / self.hist_smi_num
self.stdev = math.sqrt(variance)
self.stdev_hist = math.sqrt(variance_hist)

def update_calibration(self, duration):
if not self.needs_calibration:
return
self.acc_smi_duration += duration
self.acc_smi_num += 1
self.update_stdev(duration)
self.calib_samples += 1
if self.calib_samples >= SCAN_CALIB_SAMPLES:
self.needs_calibration = False

def is_slow_outlier(self, value):
ret = False
if self.avg_smi_duration and value > self.avg_smi_duration * (1 + OUTLIER_THRESHOLD / 100):
if value > self.avg_smi_duration + OUTLIER_STD_DEV * self.stdev:
ret = True
if self.hist_smi_duration and value > self.hist_smi_duration * (1 + OUTLIER_THRESHOLD / 100):
if value > self.hist_smi_duration + OUTLIER_STD_DEV * self.stdev_hist:
ret = True
return ret

def is_fast_outlier(self, value):
ret = False
if self.avg_smi_duration and value < self.avg_smi_duration * (1 - OUTLIER_THRESHOLD / 100):
if value < self.avg_smi_duration - OUTLIER_STD_DEV * self.stdev:
ret = True
if self.hist_smi_duration and value < self.hist_smi_duration * (1 - OUTLIER_THRESHOLD / 100):
if value < self.hist_smi_duration - OUTLIER_STD_DEV * self.stdev_hist:
ret = True
return ret

def is_outlier(self, value):
self.avg()
if self.needs_calibration:
return False
ret = False
if self.is_slow_outlier(value):
ret = True
Expand Down Expand Up @@ -501,13 +538,17 @@ def smi_fuzz_iter(self, thread_id, _addr, _smi_desc, fill_contents=True, restore
else:
while True:
_, duration = self.send_smi_timed(thread_id, _smi_desc.smi_code, _smi_desc.smi_data, _smi_desc.name, _smi_desc.desc, _rax, _rbx, _rcx, _rdx, _rsi, _rdi)
if scan.valid_smi_count():
if not scan.valid_smi_count():
continue
if scan.needs_calibration:
scan.update_calibration(duration)
continue
else:
break
#
# Re-do the call if it was identified as an outlier, due to periodic SMI delays
#
if scan.is_outlier(duration):
time.sleep(SCAN_MODE_RETRY_DELAY)
while True:
_, duration = self.send_smi_timed(thread_id, _smi_desc.smi_code, _smi_desc.smi_data, _smi_desc.name, _smi_desc.desc, _rax, _rbx, _rcx, _rdx, _rsi, _rdi)
if scan.valid_smi_count():
Expand Down

0 comments on commit eac7a23

Please sign in to comment.