From 84f51e0b87be3213357e0ef39a83c9c7370300ce Mon Sep 17 00:00:00 2001 From: Jack He Date: Wed, 13 Nov 2024 17:11:46 -0800 Subject: [PATCH 1/2] unify core python scripts --- pipeline/generate_historical_data.py | 222 ++++++++++++++++++ pipeline/regression.py | 325 +++++++++++++++++++++++++++ pipeline/sql.py | 194 ++++++++++++++++ 3 files changed, 741 insertions(+) create mode 100644 pipeline/generate_historical_data.py create mode 100644 pipeline/regression.py create mode 100644 pipeline/sql.py diff --git a/pipeline/generate_historical_data.py b/pipeline/generate_historical_data.py new file mode 100644 index 00000000..42682aff --- /dev/null +++ b/pipeline/generate_historical_data.py @@ -0,0 +1,222 @@ +import sqlite3 +import requests +import os +import json + +# Load the sqlite file +# URL of the remote SQLite file +url = 'https://raw.githubusercontent.com/microsoft/netperf/sqlite/netperf.sqlite' + +HISTORY_LENGTH = 20 + +# Check if file already exists + +if (os.path.exists('netperf.sqlite')): + print("File already exists") +else: + # Download the file + response = requests.get(url) + if response.status_code == 200: + with open('netperf.sqlite', 'wb') as file: + file.write(response.content) + +# Now, open the local copy with sqlite3 +conn = sqlite3.connect('netperf.sqlite') +cursor = conn.cursor() + +""" +General Shape of data: + + { + ---- : { + : { + run_args: "...", + # last N commits, sorted in desc order, choose best out of all runs + data: [ ... { result: X, os_version: Y, commit: Z, build_date_time: ... } ... ] + } + } + ... + } + +""" + +cursor.execute("SELECT OS_name, Architecture, Context FROM Environment GROUP BY OS_name, Architecture, Context") +environment_groups = cursor.fetchall() +cursor.execute("SELECT * FROM Secnetperf_tests") +all_secnetperf_tests = cursor.fetchall() +detailed_throughput_page_json = {} +detailed_rps_page_json = {} +detailed_hps_page_json = {} + +def execute_results_query(cursor, os_name, arch, context, io, tls, test_id, HISTORY_LENGTH): + # NOTE: this query assumes implicitly that client environment ID = server environment ID. + cursor.execute(f""" + SELECT MAX(Result), Secnetperf_test_runs.Secnetperf_commit, OS_version, Build_date_time, Run_date FROM Secnetperf_test_runs + JOIN Secnetperf_builds ON Secnetperf_builds.Secnetperf_commit = Secnetperf_test_runs.Secnetperf_commit + JOIN Environment ON Environment.Environment_ID = Secnetperf_test_runs.Client_environment_ID + WHERE OS_name = "{os_name}" AND Architecture = "{arch}" AND Context = "{context}" AND io = "{io}" AND tls = "{tls}" AND Secnetperf_test_ID = "{test_id}" + GROUP BY Secnetperf_test_runs.Secnetperf_commit + ORDER BY Build_date_time DESC, Run_date DESC + LIMIT {HISTORY_LENGTH} + """) + return cursor.fetchall() + +def execute_latency_query(cursor, os_name, arch, context, io, tls, test_id, HISTORY_LENGTH): + # NOTE: Notice the use of MIN(P0) - this is because we only save the full latency curves for the minimum P0 run out of 3 runs. + cursor.execute(f""" + SELECT Secnetperf_test_runs.Secnetperf_commit, Secnetperf_test_runs.Secnetperf_latency_stats_ID, OS_version, Build_date_time, Run_date, MIN(P0), P50, P90, P99, P999, P9999, P99999, P999999 FROM Secnetperf_test_runs + JOIN Secnetperf_builds ON Secnetperf_builds.Secnetperf_commit = Secnetperf_test_runs.Secnetperf_commit + JOIN Environment ON Environment.Environment_ID = Secnetperf_test_runs.Client_environment_ID + JOIN Secnetperf_latency_stats ON Secnetperf_latency_stats.Secnetperf_latency_stats_ID = Secnetperf_test_runs.Secnetperf_latency_stats_ID + WHERE OS_name = "{os_name}" AND Architecture = "{arch}" AND Context = "{context}" AND io = "{io}" AND tls = "{tls}" AND Secnetperf_test_ID = "{test_id}" + GROUP BY Secnetperf_test_runs.Secnetperf_commit + ORDER BY Build_date_time DESC, Run_date DESC + LIMIT {HISTORY_LENGTH} + """) + return cursor.fetchall() + +requires_archived_throughput_data = False +requires_archived_rps_data = False +requires_archived_hps_data = False +requires_archived_latency_data = False + +def generate_tput_rps_hps_pages(cursor, all_secnetperf_tests, environment_groups, use_archive=False): + global requires_archived_throughput_data + global requires_archived_rps_data + global requires_archived_hps_data + global detailed_throughput_page_json + global detailed_rps_page_json + global detailed_hps_page_json + for test_id, _, run_args in all_secnetperf_tests: + + if use_archive and "scenario" in test_id: + continue + + if not use_archive and "scenario" not in test_id: + continue + + for os_name, arch, context in environment_groups: + for io in ["iocp", "epoll", "wsk", "xdp"]: + for tls in ["schannel", "openssl"]: + env_id_str = f"{os_name}-{arch}-{context}-{io}-{tls}" + if "download" in test_id or "upload" in test_id or "tput" in test_id: + if not requires_archived_throughput_data and use_archive: + continue + data = execute_results_query(cursor, os_name, arch, context, io, tls, test_id, HISTORY_LENGTH) + if not data: + continue + if len(data) < HISTORY_LENGTH: + print("We need more throughput data! Resorting to archive.") + requires_archived_throughput_data = True + if not env_id_str in detailed_throughput_page_json: + detailed_throughput_page_json[env_id_str] = { + f"{test_id}" : { + "run_args" : run_args, + "data": data + } + } + else: + detailed_throughput_page_json[env_id_str][f"{test_id}"] = { + "run_args" : run_args, + "data": data + } + elif "rps" in test_id: + if not requires_archived_rps_data and use_archive: + continue + data = execute_results_query(cursor, os_name, arch, context, io, tls, test_id, HISTORY_LENGTH) + if not data: + continue + if len(data) < HISTORY_LENGTH: + print("We need more rps data! Resorting to archive.") + requires_archived_rps_data = True + if not env_id_str in detailed_rps_page_json: + detailed_rps_page_json[env_id_str] = { + f"{test_id}" : { + "run_args" : run_args, + "data": data + } + } + else: + detailed_rps_page_json[env_id_str][f"{test_id}"] = { + "run_args" : run_args, + "data": data + } + elif "hps" in test_id: + if not requires_archived_hps_data and use_archive: + continue + data = execute_results_query(cursor, os_name, arch, context, io, tls, test_id, HISTORY_LENGTH) + if not data: + continue + if len(data) < HISTORY_LENGTH: + print("We need more hps data! Resorting to archive.") + requires_archived_throughput_data = True + if not env_id_str in detailed_hps_page_json: + detailed_hps_page_json[env_id_str] = { + f"{test_id}" : { + "run_args" : run_args, + "data": data + } + } + else: + detailed_hps_page_json[env_id_str][f"{test_id}"] = { + "run_args" : run_args, + "data": data + } + + +detailed_latency_page = {} +def generate_latency_page(cursor, all_secnetperf_tests, environment_groups, use_archive=False): + global requires_archived_latency_data + global detailed_latency_page + for test_id, _, run_args in all_secnetperf_tests: + + if use_archive and "scenario" in test_id: + continue + + if not use_archive and "scenario" not in test_id: + continue + + if not requires_archived_latency_data and use_archive: + continue + + for os_name, arch, context in environment_groups: + for io in ["iocp", "epoll", "wsk", "xdp"]: + for tls in ["schannel", "openssl"]: + data = execute_latency_query(cursor, os_name, arch, context, io, tls, test_id, HISTORY_LENGTH) + if not data: + continue + if len(data) < HISTORY_LENGTH: + requires_archived_latency_data = True + env_id_str = f"{os_name}-{arch}-{context}-{io}-{tls}" + if not env_id_str in detailed_latency_page: + detailed_latency_page[env_id_str] = { + f"{test_id}" : { + "run_args" : run_args, + "data": data + } + } + else: + detailed_latency_page[env_id_str][f"{test_id}"] = { + "run_args" : run_args, + "data": data + } + +generate_tput_rps_hps_pages(cursor, all_secnetperf_tests, environment_groups, use_archive=False) +generate_tput_rps_hps_pages(cursor, all_secnetperf_tests, environment_groups, use_archive=True) +generate_latency_page(cursor, all_secnetperf_tests, environment_groups, use_archive=False) +generate_latency_page(cursor, all_secnetperf_tests, environment_groups, use_archive=True) + +# Save to disk +with open('historical_throughput_page.json', 'w') as file: + json.dump(detailed_throughput_page_json, file) + +with open('historical_rps_page.json', 'w') as file: + json.dump(detailed_rps_page_json, file) + +with open('historical_hps_page.json', 'w') as file: + json.dump(detailed_hps_page_json, file) + +with open('historical_latency_page.json', 'w') as file: + json.dump(detailed_latency_page, file) + +conn.close() diff --git a/pipeline/regression.py b/pipeline/regression.py new file mode 100644 index 00000000..b26d9c66 --- /dev/null +++ b/pipeline/regression.py @@ -0,0 +1,325 @@ +import sqlite3 +import json +import statistics +import argparse +import glob +import os + +# Create the parser +parser = argparse.ArgumentParser(description="Process a feature integer.") + +# Add an integer argument with a default value +parser.add_argument('--featureint', type=int, default=1, help='An integer number (default: 1).') + +# Parse the arguments +args = parser.parse_args() + +# Connect to the database +conn = sqlite3.connect('netperf.sqlite') +cursor = conn.cursor() + +""" + +This script updates the regression parameters AFTER a merge / database commit. + +Singlular datapoint method (featureint == 1) + Brief. + We store the watermark result in our database. + On each new run, we grab the JSON results, and compare that against what we have in the database. + + +Sliding window method (featureint == 2) + Brief. + Watermark-based regression: + ASSUME that the latest run has already been committed to the database. + We maintain the best result we have seen thus far, and a 'noise' component (0 - 1). + We fail a run if the new result is < the best result * (1 - noise) or > the best result * (1 + noise). + The structure of watermark json looks like: + { + - { + : { + + "BestResult": number + "BestResultCommit": string + "Noise": number + "baseline": BestResult * (1 - noise) + + "LatencyUpperBound" : { + + # TODO: not exactly sure what watermark for latencies should be. + + # For instance, do we use the minimum of EACH respective column? + # That would mean the newest run wouldn't compare against ONE best run in the past, but potentially THREE best runs (MIN(P0), MIN(P50), MIN(P99)) + # Not sure which is the way to go, so just re-use the MEAN - STD model here. + + "P0" : MEAN( last N runs P0 column ) + STD( last N runs P0 column ) + "P50" : MEAN( last N runs ... ) + STD( last N runs ... ) + "P99" : MEAN( last N runs ... ) + STD( last N runs ... ) + } + } + } + } + Mean - Standard Deviation regression: + Similar to watermark-based, except instead of storing BestResult, we store Mean(last N runs) +- STD(last N runs), + and use that as our baseline. +""" + +print("Performing regression analysis to compute upper/lower bounds...") + +# REQUIRE: Database has clean data, and Environments table contain comprehensive data. +def singular_datapoint_method(): + global conn + global cursor + """ + NOTE: For this method, we are not recording a "best ever" latency result as it's unclear how we should compare 2 distributions. + Who is to say one distrobution with a lower P0 but a higher P99 is better than another? + """ + NOISE = 0.3 # We allow future runs to be 30% less than the best-ever result. + + watermark_regression_file = {} + for json_file_path in glob.glob('*.json'): + if not os.path.exists(f"{json_file_path}/{json_file_path}"): + continue + + with open(f"{json_file_path}/{json_file_path}", 'r') as json_file: + print("Processing file: {}".format(json_file_path)) + # Grab data + json_content = json_file.read() + json_obj = json.loads(json_content) + parts = json_file_path.split("-") + assert len(parts) >= 8 + commit = json_obj["commit"] + io = parts[-1] + io = io.split(".")[0] + tls = parts[-2] + arch = parts[-3] + os_name_2 = parts[-4] + os_name_1 = parts[-5] + os_name = os_name_1 + "-" + os_name_2 + context = parts[-6] # lab / azure + for testid in json_obj: + if testid == "commit" or testid == "os_version" or "-lat" in testid or testid == "run_args" or "regression" in testid: + continue + + env_str = f"{os_name}-{arch}-{context}-{io}-{tls}" + + if "rps" in testid: + # NOTE: We are ignoring the percentiles and only considering requests per second because we don't have a way to compare 2 distributions. + result = json_obj[testid] # Looks like [p0, p50 ... RPS, p0, p50, ... RPS], where RPS is every 9th element. + new_result_avg = sum([int(result) for result in result[8::9]]) / len(result[8::9]) # [8::9] grabs every 9th element starting from the 9th element. + else: + new_result_avg = sum([int(result) for result in json_obj[testid]]) / len(json_obj[testid]) + cursor.execute(f""" + SELECT BestResult, BestResultCommit FROM Secnetperf_tests_watermark WHERE Secnetperf_test_ID = '{testid}' AND environment = '{env_str}' + """) + bestever = new_result_avg + bestever_commit = commit + watermark_so_far = cursor.fetchall() + if not watermark_so_far: + cursor.execute(f""" + INSERT INTO Secnetperf_tests_watermark (Secnetperf_test_ID, environment, BestResult, BestResultCommit) VALUES ('{testid}', '{env_str}', {new_result_avg}, '{bestever_commit}') + """) + conn.commit() + else: + bestever_result = watermark_so_far[0][0] + best_commit_so_far = watermark_so_far[0][1] + if float(new_result_avg) > float(bestever_result): + cursor.execute(f""" + UPDATE Secnetperf_tests_watermark SET BestResult = {new_result_avg}, BestResultCommit = '{bestever_commit}' WHERE Secnetperf_test_ID = '{testid}' AND environment = '{env_str}' + """) + conn.commit() + else: + bestever = bestever_result + bestever_commit = best_commit_so_far + + if testid not in watermark_regression_file: + watermark_regression_file[testid] = {} + + watermark_regression_file[testid][env_str] = { + "BestResult": bestever, + "Noise": NOISE, + "baseline": bestever * (1 - NOISE), + "BestResultCommit": bestever_commit + } + + with open('watermark_regression.json', 'w') as f: + json.dump(watermark_regression_file, f, indent=4) + +def compute_baseline_watermark(test_run_results, test): + # Use a statistical approach to compute a baseline. + """ + test_run_results: [ + ... + ( Result: REAL, Secnetperf_latency_stats_ID?: *optional, INTEGER) + ... + ] + + So if Secnetperf_latency_stats_ID is not NULL, then the test is a latency test, and the Result is RPS. + For computing the baseline of Results (which is ALWAYS a lower bound), we start with a simple approach: + + - Compute the average and subtract 2 standard deviations from the average. + - For datasets with high variance (but still somewhat normally distributed), this approach is robust enough to be used as a baseline. + """ + + baseline = { + "baseline" : None + } + if "rps" in test: + try: + p0 = [run_result[2] for run_result in test_run_results] + p50 = [run_result[3] for run_result in test_run_results] + p99 = [run_result[4] for run_result in test_run_results] + + p0UpperBound = statistics.mean(p0) + 3 * statistics.stdev(p0) + p50UpperBound = statistics.mean(p50) + 3 * statistics.stdev(p50) + p99UpperBound = statistics.mean(p99) + 3 * statistics.stdev(p99) + + baseline["latencyUpperBound"] = { + "P0": p0UpperBound, + "P50": p50UpperBound, + "P99": p99UpperBound + } + except: + print("Fatal error, expected P0, P50, P99 columns in test_run_results") + + max_result = 0 + max_result_commit = None + + for result in test_run_results: + Result = result[0] + commit = result[1] + if Result > max_result: + max_result = Result + max_result_commit = commit + + baseline["BestResult"] = max_result + baseline["BestResultCommit"] = max_result_commit + baseline["Noise"] = 0.2 # TODO: Once we aggregate enough data, we should run tests to see what is the tightest bound here. And build a more robust infra to change / invalidate this. + baseline["baseline"] = max_result * (1 - baseline["Noise"]) + return baseline + +def compute_baseline(test_run_results, test): + # Use a statistical approach to compute a baseline. + """ + test_run_results: [ + ... + ( Result: REAL, Secnetperf_latency_stats_ID?: *optional, INTEGER) + ... + ] + + So if Secnetperf_latency_stats_ID is not NULL, then the test is a latency test, and the Result is RPS. + For computing the baseline of Results (which is ALWAYS a lower bound), we start with a simple approach: + + - Compute the average and subtract 2 standard deviations from the average. + - For datasets with high variance (but still somewhat normally distributed), this approach is robust enough to be used as a baseline. + """ + + results = [run_result[0] for run_result in test_run_results] + baseline = { + "baseline" : None + } + if "rps" in test: + # Compute upper bound for RPS as well. + try: + p0 = [run_result[2] for run_result in test_run_results] + p50 = [run_result[3] for run_result in test_run_results] + p99 = [run_result[4] for run_result in test_run_results] + + p0UpperBound = statistics.mean(p0) + 3 * statistics.stdev(p0) + p50UpperBound = statistics.mean(p50) + 3 * statistics.stdev(p50) + p99UpperBound = statistics.mean(p99) + 3 * statistics.stdev(p99) + + baseline["latencyUpperBound"] = { + "P0": p0UpperBound, + "P50": p50UpperBound, + "P99": p99UpperBound + } + except: + print("Fatal error, expected P0, P50, P99 columns in test_run_results") + mean = statistics.mean(results) + if len(results) < 2: + baseline["baseline"] = mean * 0.5 + return baseline + else: + std = statistics.stdev(results) + lowerbound = mean - 3 * std + baseline["baseline"] = lowerbound + return baseline + +def sliding_window(): + global conn + global cursor + + N = 20 # Number of most recent commit results to consider for each environment group. + + cursor.execute("SELECT Secnetperf_test_ID, Kernel_mode, Run_arguments FROM Secnetperf_tests") + + all_tests = cursor.fetchall() + + cursor.execute("SELECT OS_name, Architecture, Context FROM Environment GROUP BY OS_name, Architecture, Context") + + environment_groups = cursor.fetchall() + + regression_file = { + testid: {} for testid, _, _ in all_tests + } + + watermark_regression_file = { + testid: {} for testid, _, _ in all_tests + } + + for testid, _, _ in all_tests: + for io in ["wsk", "xdp", "epoll", "rio", "iocp"]: + for tls in ["openssl", "schannel"]: + for os_name, arch, context in environment_groups: + # NOTE: This SQL query makes the implicit assumption that Server environment ID = Client environment ID. + # If in the future we decide to test scenarios where we have Linux --> Windows... etc, this query will need to change. As well as a lot of our automation YAML as well. + + if "rps" in testid: + # Rows fetched guaranteed to have latency results + cursor.execute( + f""" + SELECT AVG(Result), Secnetperf_test_runs.Secnetperf_commit, AVG(P0), AVG(P50), AVG(P99) FROM Secnetperf_test_runs + JOIN Secnetperf_latency_stats ON Secnetperf_test_runs.Secnetperf_latency_stats_ID = Secnetperf_latency_stats.Secnetperf_latency_stats_ID + JOIN Environment ON Secnetperf_test_runs.Client_Environment_ID = Environment.Environment_ID + JOIN Secnetperf_builds ON Secnetperf_test_runs.Secnetperf_commit = Secnetperf_builds.Secnetperf_commit + WHERE Secnetperf_test_runs.Secnetperf_test_ID = '{testid}' AND Secnetperf_test_runs.io = '{io}' AND Secnetperf_test_runs.tls = '{tls}' AND Environment.OS_name = '{os_name}' AND Environment.Architecture = '{arch}' AND Environment.Context = '{context}' + GROUP BY Secnetperf_test_runs.Secnetperf_commit + ORDER BY Build_date_time DESC, Secnetperf_test_runs.Run_date DESC + LIMIT {N} + """ + ) + else: + cursor.execute( + f""" + SELECT AVG(Result), Secnetperf_test_runs.Secnetperf_commit FROM Secnetperf_test_runs + JOIN Environment ON Secnetperf_test_runs.Client_Environment_ID = Environment.Environment_ID + JOIN Secnetperf_builds ON Secnetperf_test_runs.Secnetperf_commit = Secnetperf_builds.Secnetperf_commit + WHERE Secnetperf_test_runs.Secnetperf_test_ID = '{testid}' AND Secnetperf_test_runs.io = '{io}' AND Secnetperf_test_runs.tls = '{tls}' AND Environment.OS_name = '{os_name}' AND Environment.Architecture = '{arch}' AND Environment.Context = '{context}' + GROUP BY Secnetperf_test_runs.Secnetperf_commit + ORDER BY Build_date_time DESC, Secnetperf_test_runs.Run_date DESC + LIMIT {N} + """ + ) + data = cursor.fetchall() + if not data: + continue + regression_file[testid][f"{os_name}-{arch}-{context}-{io}-{tls}"] = compute_baseline(data, testid) + watermark_regression_file[testid][f"{os_name}-{arch}-{context}-{io}-{tls}"] = compute_baseline_watermark(data, testid) + + # Save results to a json file. + with open('regression.json', 'w') as f: + json.dump(regression_file, f, indent=4) + + with open('watermark_regression.json', 'w') as f: + json.dump(watermark_regression_file, f, indent=4) + + # Close connection + conn.close() + +if args.featureint == 1: + singular_datapoint_method() +elif args.featureint == 2: + sliding_window() +else: + print("Method not supported.") \ No newline at end of file diff --git a/pipeline/sql.py b/pipeline/sql.py new file mode 100644 index 00000000..521d91ad --- /dev/null +++ b/pipeline/sql.py @@ -0,0 +1,194 @@ +""" +This script does post-processing of test results, saving them to the database, doing some data cleaning in the meantime. +""" + +import sqlite3 +import glob +import argparse +import json +import datetime +import os + +# Connect to the database +conn = sqlite3.connect('netperf.sqlite') +cursor = conn.cursor() + +# Create the parser +parser = argparse.ArgumentParser(description="Process a feature integer.") + +# Add an integer argument with a default value +parser.add_argument('--featureint', type=int, default=0, help='An integer number (default: 0).') + +# Parse the arguments +args = parser.parse_args() + +class Worker: + def __init__(self, cursor, production=False) -> None: + self.sql_statements_executed = [] + self.cursor = cursor + self.production = production + def execute(self, sql): + self.sql_statements_executed.append(sql) + if self.production: + self.cursor.execute(sql) + + def print_executed_statements(self): + for sql in self.sql_statements_executed: + print(sql) + print('-------------------------------') + +print("Dynamically executing SQL from the JSON...") +for json_file_path in glob.glob('*.json'): + if not os.path.exists(f"{json_file_path}/{json_file_path}"): + continue + with open(f"{json_file_path}/{json_file_path}", 'r') as json_file: + print("Processing file: {}".format(json_file_path)) + + # Grab data + json_content = json_file.read() + json_obj = json.loads(json_content) + commit = json_obj["commit"] + os_version = json_obj["os_version"] + parts = json_file_path.split("-") + assert len(parts) >= 8 + io = parts[-1] + io = io.split(".")[0] + tls = parts[-2] + arch = parts[-3] + os_name_2 = parts[-4] + os_name_1 = parts[-5] + os_name = os_name_1 + "-" + os_name_2 + context = parts[-6] # lab / azure + + # Check if Environment table has "SKU" column + cursor.execute("PRAGMA table_info(Environment)") + columns = cursor.fetchall() + sku_column_exists = False + for column in columns: + if column[1] == "SKU": + sku_column_exists = True + break + + # If it doesn't exists, add the column, and then initailize the data + if not sku_column_exists: + print("Extending the Environment table with the SKU column") + cursor.execute("SELECT * FROM Environment") + current_env_data = cursor.fetchall() + cursor.execute("ALTER TABLE Environment ADD COLUMN SKU TEXT") + for row in current_env_data: + sku = "" + if row[4] == "azure": + if "windows-2025" in row[1]: + sku = "F-Series(4vCPU, 8GiB RAM)" + elif "windows-2022" in row[1]: + sku = "io=wsk, F-Series(4vCPU, 8GiB RAM). io=iocp,xdp, Experimental_Boost4(4vCPU, 8GiB RAM)" + else: + sku = "Experimental_Boost4(4vCPU, 8GiB RAM)" + else: + sku = "Dell PowerEdge R650 (80 logical CPUs, 128GB RAM)" + print(f"Updating row with SKU: {sku}") + cursor.execute(f"""UPDATE Environment SET SKU = '{sku}' WHERE Environment_ID = {row[0]}""") + conn.commit() + + if "SKU" not in json_obj: + # Update Environments table + cursor.execute(f"""SELECT Environment_ID FROM Environment WHERE Architecture = '{arch}' AND OS_name = '{os_name}' AND OS_version = '{os_version}' AND Context = '{context}'""") + else: + cursor.execute(f"""SELECT Environment_ID FROM Environment WHERE Architecture = '{arch}' AND OS_name = '{os_name}' AND OS_version = '{os_version}' AND Context = '{context}' AND SKU = '{json_obj["SKU"]}'""") + + result = cursor.fetchall() + + if len(result) == 0: + if "SKU" in json_obj: + new_sku = json_obj["SKU"] + else: + if context == "azure": + if "windows-2025" in os_name: + new_sku = "F-Series(4vCPU, 8GiB RAM)" + elif "windows-2022" in os_name: + new_sku = "io=wsk, F-Series(4vCPU, 8GiB RAM). io=iocp,xdp, Experimental_Boost4(4vCPU, 8GiB RAM)" + else: + new_sku = "Experimental_Boost4(4vCPU, 8GiB RAM)" + else: + new_sku = "Dell PowerEdge R650 (80 logical CPUs, 128GB RAM)" + print('inserting new row with new environment') + cursor.execute(f"""INSERT INTO Environment (OS_name, OS_version, Architecture, Context, SKU) VALUES ('{os_name}', '{os_version}', '{arch}', '{context}', '{new_sku}')""") + conn.commit() + environment_id = cursor.lastrowid + else: + print('using existing environment') + environment_id = result[0][0] + # print(f"Environment ID: {environment_id}") + + worker = Worker(cursor=cursor, production=True) + + worker.execute(f""" + +INSERT OR REPLACE INTO Secnetperf_builds (Secnetperf_Commit, Build_date_time, TLS_enabled, Advanced_build_config) +VALUES ("{commit}", "{datetime.datetime.now()}", 1, "no special configurations."); + + """) + + if not os.path.exists("full_latencies"): + os.makedirs("full_latencies") + + for testid in json_obj: + if testid == "commit" or testid == "os_version" or "-lat" in testid or testid == "run_args" or "regression" in testid: + continue + + # truncate -tcp or -quic from testid + Testid = testid.split("-") + transport = Testid.pop() + Testid = "-".join(Testid) + extra_arg = " -tcp:1" if transport == "tcp" else " -tcp:0" + if "run_args" in json_obj: + run_args = json_obj["run_args"][Testid] + else: + run_args = "default_scenario_options" + worker.execute(f""" +INSERT OR IGNORE INTO Secnetperf_tests (Secnetperf_test_ID, Kernel_mode, Run_arguments) VALUES ("scenario-{testid}", 1, "{run_args + extra_arg}"); + """) + + if "latency" in testid or "rps" in testid: + + full_latency_curve_ids_to_save = {} + minimum_p0 = float('inf') + + # is a flattened 1D array of the form: [ first run + RPS, second run + RPS, third run + RPS..... ], ie. if each run has 8 values + RPS, then the array has 27 elements (8*3 + 3) + for offset in range(0, len(json_obj[testid]), 9): + p0 = float(json_obj[testid][offset]) + minimum_p0 = min(minimum_p0, p0) + # print(offset) + worker.execute(f""" +INSERT INTO Secnetperf_latency_stats (p0, p50, p90, p99, p999, p9999, p99999, p999999) +VALUES ({json_obj[testid][offset]}, {json_obj[testid][offset+1]}, {json_obj[testid][offset+2]}, {json_obj[testid][offset+3]}, {json_obj[testid][offset+4]}, {json_obj[testid][offset+5]}, {json_obj[testid][offset+6]}, {json_obj[testid][offset+7]}); +""") + last_row_inserted_id = worker.cursor.lastrowid + worker.execute(f""" +INSERT INTO Secnetperf_test_runs (Secnetperf_test_ID, Secnetperf_commit, Client_environment_ID, Server_environment_ID, Result, Secnetperf_latency_stats_ID, io, tls, Run_date) +VALUES ("scenario-{testid}", "{commit}", {environment_id}, {environment_id}, {json_obj[testid][offset+8]}, {last_row_inserted_id}, "{io}", "{tls}", "{datetime.datetime.now()}"); +""") + if testid + "-lat" in json_obj: + full_latency_curve_ids_to_save[last_row_inserted_id] = (p0, json_obj[testid + "-lat"][offset // 9]) + + for stats_id in full_latency_curve_ids_to_save: + p0_val, lat_curve = full_latency_curve_ids_to_save[stats_id] + if p0_val == minimum_p0: + print(f"Saving full latency curve for {testid} with p0 = {p0_val}") + with open(f"full_latencies/full_curve_{stats_id}.json", 'w') as f: + json.dump(lat_curve, f) + else: + for item in json_obj[testid]: + worker.execute(f""" +INSERT INTO Secnetperf_test_runs (Secnetperf_test_ID, Secnetperf_commit, Client_environment_ID, Server_environment_ID, Result, Secnetperf_latency_stats_ID, io, tls, Run_date) +VALUES ("scenario-{testid}", "{commit}", "{environment_id}", "{environment_id}", {item}, NULL, "{io}", "{tls}", "{datetime.datetime.now()}"); +""") + + # Commit changes + conn.commit() + + # dump SQL file for debugging + # worker.print_executed_statements() + +# Close connection +conn.close() From 6b371205785b2dfd8e48d0a02f3a13b09cbcece2 Mon Sep 17 00:00:00 2001 From: Jack He Date: Wed, 13 Nov 2024 17:34:55 -0800 Subject: [PATCH 2/2] python script bugfixes --- pipeline/generate_historical_data.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pipeline/generate_historical_data.py b/pipeline/generate_historical_data.py index 42682aff..e51a2947 100644 --- a/pipeline/generate_historical_data.py +++ b/pipeline/generate_historical_data.py @@ -149,7 +149,7 @@ def generate_tput_rps_hps_pages(cursor, all_secnetperf_tests, environment_groups continue if len(data) < HISTORY_LENGTH: print("We need more hps data! Resorting to archive.") - requires_archived_throughput_data = True + requires_archived_hps_data = True if not env_id_str in detailed_hps_page_json: detailed_hps_page_json[env_id_str] = { f"{test_id}" : { @@ -179,6 +179,9 @@ def generate_latency_page(cursor, all_secnetperf_tests, environment_groups, use_ if not requires_archived_latency_data and use_archive: continue + if "latency" not in test_id and "rps" not in test_id: + continue + for os_name, arch, context in environment_groups: for io in ["iocp", "epoll", "wsk", "xdp"]: for tls in ["schannel", "openssl"]: @@ -186,6 +189,7 @@ def generate_latency_page(cursor, all_secnetperf_tests, environment_groups, use_ if not data: continue if len(data) < HISTORY_LENGTH: + print("We need more latency data! Resorting to archive.") requires_archived_latency_data = True env_id_str = f"{os_name}-{arch}-{context}-{io}-{tls}" if not env_id_str in detailed_latency_page: