Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Jan 9, 2025
1 parent 6bb68c2 commit 1f21026
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 79 deletions.
27 changes: 15 additions & 12 deletions evals/benchmark/stresscli/commands/load_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import yaml

from .report import export_testdata
from .utilization import MetricFetcher
from .utils import dump_k8s_config, generate_lua_script, generate_random_suffix
from .utilization import MetricFetcher

# Default load shape
DEFAULT_LOADSHAPE = "constant"
Expand Down Expand Up @@ -282,15 +282,15 @@ def run_locust_test(kubeconfig, global_settings, run_settings, output_folder, in
collector = DockerMetricsCollector()
collect_metrics(collector, services, start_output_folder)

utilization_metric = global_settings.get('utilization-metric-collect', False)
utilization_metric = global_settings.get("utilization-metric-collect", False)
if utilization_metric:
services = global_settings.get('service-list') or []
metrics_endpoints = global_settings.get('utilization-metric-endpoint') or []
metrics_port = global_settings.get('utilization-metric-port', 9100)
metric_names = global_settings.get('utilization-metric-names') or []
services = global_settings.get("service-list") or []
metrics_endpoints = global_settings.get("utilization-metric-endpoint") or []
metrics_port = global_settings.get("utilization-metric-port", 9100)
metric_names = global_settings.get("utilization-metric-names") or []
fetcher = MetricFetcher(metrics_endpoints, metrics_port)
print(f"before start_collect_utilization:")
start_collect_utilization(fetcher,metric_names,namespace)
print("before start_collect_utilization:")
start_collect_utilization(fetcher, metric_names, namespace)

runspec["starttest_time"] = datetime.now().isoformat()
result = subprocess.run(cmd, capture_output=True, text=True)
Expand All @@ -300,7 +300,7 @@ def run_locust_test(kubeconfig, global_settings, run_settings, output_folder, in
util_output_folder = os.path.join(output_folder, f"{index}_utilization")
print(f"before stop_collect_utilization: {util_output_folder}")
os.makedirs(util_output_folder, exist_ok=True)
stop_collect_utilization(fetcher,util_output_folder,metric_names)
stop_collect_utilization(fetcher, util_output_folder, metric_names)

if service_metric:
from .metrics_util import export_metric
Expand All @@ -320,21 +320,24 @@ def run_locust_test(kubeconfig, global_settings, run_settings, output_folder, in
if runspec["deployment-type"] == "k8s":
dump_test_spec(kubeconfig, runspec, runspec["namespace"], output_folder, index)

def start_collect_utilization(fetcher,metric_names,namespace):

def start_collect_utilization(fetcher, metric_names, namespace):
# Start the MetricFetcher
fetcher.start(
metric_names,
namespace=namespace,
)

def stop_collect_utilization(fetcher,output_folder,metric_names):

def stop_collect_utilization(fetcher, output_folder, metric_names):
# Stop the MetricFetcher
fetcher.stop()
# Save results to a file
print(f"Calculated Average and Max per Container:")
print("Calculated Average and Max per Container:")
fetcher.save_results_to_file(output_folder)
print(f"after save_results_to_folder {output_folder}")


def dump_test_spec(kubeconfig, run, namespace, output_folder, index):
# Dump the k8s spec
k8s_spec = dump_k8s_config(kubeconfig, return_as_dict=True, namespace=namespace)
Expand Down
21 changes: 14 additions & 7 deletions evals/benchmark/stresscli/commands/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@
CSV_SECTION_NAME = "stats.csv"
TESTSPEC_SECTION_NAME = "testspec.yaml"
METRICS_SECTION_NAME = "metrics.json"
UTILIZATION_SECTION_NAME = 'utilization.json'
UTILIZATION_SECTION_NAME = "utilization.json"


@click.command()
@click.option("--folder", type=click.Path(), required=True, help="Path to log folder")
@click.option("--format", default="plain_text", help="Output format, plain_text or csv, default is plain_text")
# @click.option('--include', default='testspec.yaml', help='Extract output data from output.log, stats.csv, and testspec.yaml, default is testspec.yaml')
@click.option('--transformeddata', type=bool, default=False, help='If transformedData is True, transpose the data to have metrics as columns.')
@click.option(
"--transformeddata",
type=bool,
default=False,
help="If transformedData is True, transpose the data to have metrics as columns.",
)
@click.option("-o", "--output", type=click.Path(), help="Save output to file")
@click.pass_context
def report(ctx, folder, format, output, transformeddata):
Expand Down Expand Up @@ -117,7 +122,7 @@ def export_testdata(testcase, folder, include="output.log|stats.csv|testspec.yam
csv_path = os.path.join(folder, f"{testcase}_stats.csv")
testspec_path = os.path.join(folder, f"{testcase}_testspec.yaml")
metrics_path = os.path.join(folder, f"{testcase}_metrics.json")
utilization_path=os.path.join(folder, f'{testcase}_utilization.json')
utilization_path = os.path.join(folder, f"{testcase}_utilization.json")
extracted_data = {}
if os.path.exists(csv_path):
if TESTSPEC_SECTION_NAME in include:
Expand All @@ -131,7 +136,7 @@ def export_testdata(testcase, folder, include="output.log|stats.csv|testspec.yam
if METRICS_SECTION_NAME in include and os.path.exists(metrics_path):
extract_json(extracted_data, metrics_path)
if UTILIZATION_SECTION_NAME in include and os.path.exists(utilization_path):
extract_utilization_json(extracted_data,utilization_path)
extract_utilization_json(extracted_data, utilization_path)
else:
print("Test failure, no data")
return extracted_data
Expand Down Expand Up @@ -193,15 +198,16 @@ def extract_json(extracted_data, json_file):
except Exception as e:
print(f"An error occurred: {e}")


def extract_utilization_json(extracted_data, json_file):
try:
with open(json_file, 'r') as file:
with open(json_file, "r") as file:
data = json.load(file)

deployment_metrics = data.get("deployment_metrics", {})

for key, value in deployment_metrics.items():
# print(f"Key: {key}, Value: {value}")
# print(f"Key: {key}, Value: {value}")
extracted_data[key] = value

except json.JSONDecodeError as e:
Expand All @@ -211,6 +217,7 @@ def extract_utilization_json(extracted_data, json_file):
except Exception as e:
print(f"An error occurred: {e}")


# Function to extract information based on keywords and patterns
def extract_stdout(extracted_data, log):
keywords = read_log_keywords("config.ini")
Expand Down
110 changes: 53 additions & 57 deletions evals/benchmark/stresscli/commands/utilization.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
import re
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import concurrent.futures
import csv
import json
import logging
import os
import re
import time
import requests
import logging
import json
import csv
import concurrent.futures
from prometheus_client.parser import text_string_to_metric_families
from collections import defaultdict
from threading import Thread, Event
from threading import Event, Thread

import requests
from kubernetes import client, config
from prometheus_client.parser import text_string_to_metric_families

# Configure CONSTANTS Value
METRIX_PREFIX="rdt_container_"
METRIX_PREFIX = "rdt_container_"
# Configure logging
log_level = 'ERROR' # Default log level
log_level = "ERROR" # Default log level
logging.basicConfig(level=getattr(logging, log_level))
logger = logging.getLogger(__name__)


class MetricFetcher:
def __init__(self, endpoints, port: int):
self.endpoints = endpoints
Expand All @@ -41,20 +47,21 @@ def get_all_endpints(self):
"node-name": pod.spec.node_name,
"endpoint": f"http://{pod.status.pod_ip}:{self.port}/metrics",
}
for pod in pods.items if "memory-bandwidth-exporter" in pod.metadata.name
for pod in pods.items
if "memory-bandwidth-exporter" in pod.metadata.name
]

def fetch_metrics(self, metric_names: list[str], namespace: str):
"""
Fetches metrics from the specified URL, filters by metric name and namespace, and stores them in the class.
"""
"""Fetches metrics from the specified URL, filters by metric name and namespace, and stores them in the class."""
if not self.endpoints:
self.get_all_endpints()
start_time = time.time() # Start timer for fetching metrics

def fetch_endpoint(endpoint):
response = requests.get(endpoint["endpoint"])
response.raise_for_status()
return endpoint["node-name"], response.text

with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(fetch_endpoint, endpoint): endpoint for endpoint in self.endpoints}
for future in concurrent.futures.as_completed(futures):
Expand All @@ -68,9 +75,7 @@ def fetch_endpoint(endpoint):
logger.debug(f"Time taken to fetch metrics: {fetch_duration:.2f} seconds")

def parse_metrics(self, metric_names: list[str], namespace: str):
"""
Parses metrics from the stored responses and stores them in the class.
"""
"""Parses metrics from the stored responses and stores them in the class."""
start_time = time.time()
for node_name, metrics_data_list in self.responses.items():
for metrics_data in metrics_data_list:
Expand All @@ -92,62 +97,61 @@ def parse_metrics(self, metric_names: list[str], namespace: str):
logger.debug(f"Time taken to parse metrics: {parse_duration:.2f} seconds")

def save_raw_metrics(self, output_folder: str):
"""
Saves the metrics to CSV files with node_name and metrics as granularity.
"""
"""Saves the metrics to CSV files with node_name and metrics as granularity."""
for node_name, metrics in self.metrics.items():
for metric_name, containers in metrics.items():
pmetric_name = metric_name
if pmetric_name.startswith(METRIX_PREFIX):
pmetric_name = pmetric_name[len(METRIX_PREFIX):]
pmetric_name = pmetric_name[len(METRIX_PREFIX) :]
filename = os.path.join(output_folder, f"{node_name}_{pmetric_name}.csv")
with open(filename, mode='w', newline='') as file:
with open(filename, mode="w", newline="") as file:
writer = csv.writer(file)
# Write header
header = ['container_id'] + [f"collect_num_{i}" for i in range(len(next(iter(containers.values()))))]
header = ["container_id"] + [
f"collect_num_{i}" for i in range(len(next(iter(containers.values()))))
]
writer.writerow(header)
# Write rows
for container_id, values in containers.items():
row = [container_id] + values
writer.writerow(row)
# Write sum row
sum_row = ['sum'] + [sum(values[i] for values in containers.values()) for i in range(len(next(iter(containers.values()))))]
sum_row = ["sum"] + [
sum(values[i] for values in containers.values())
for i in range(len(next(iter(containers.values()))))
]
writer.writerow(sum_row)
logger.info(f"Metrics saved to {filename}")

def save_summary_table(self, output_folder: str):
"""
Creates a summary table with container_id_podname as rows and metrics as columns.
"""
"""Creates a summary table with container_id_podname as rows and metrics as columns."""
for node_name, metrics in self.metrics.items():
summary_table = defaultdict(dict)
for metric_name, containers in metrics.items():
pmetric_name = metric_name
if pmetric_name.startswith(METRIX_PREFIX):
pmetric_name = pmetric_name[len(METRIX_PREFIX):]
pmetric_name = pmetric_name[len(METRIX_PREFIX) :]
filename = os.path.join(output_folder, f"{node_name}_{pmetric_name}.csv")
max_sum_index = self.get_max_sum_column_from_csv(filename)
with open(filename, mode='r') as file:
with open(filename, mode="r") as file:
reader = csv.reader(file)
header = next(reader) # Skip header
for row in reader:
if row[0] != 'sum':
if row[0] != "sum":
container_id = row[0]
pod_name = self.container_pod.get(container_id, "Unknown")
container_id_podname = f"{container_id}({pod_name})"
summary_table[container_id_podname][metric_name] = float(row[max_sum_index + 1])
self.save_summary_table_to_csv(summary_table, output_folder, node_name)

def get_max_sum_column_from_csv(self, filename: str) -> int:
"""
Reads a CSV file and returns the index of the column with the maximum sum value.
"""
with open(filename, mode='r') as file:
"""Reads a CSV file and returns the index of the column with the maximum sum value."""
with open(filename, mode="r") as file:
reader = csv.reader(file)
header = next(reader) # Skip header
sum_row = None
for row in reader:
if row[0] == 'sum':
if row[0] == "sum":
sum_row = row[1:]
break
if sum_row is None:
Expand All @@ -156,16 +160,16 @@ def get_max_sum_column_from_csv(self, filename: str) -> int:
return max_sum_index

def save_summary_table_to_csv(self, summary_table: dict[str, dict[str, float]], output_folder: str, node_name: str):
"""
Saves the summary table to a CSV file.
"""
"""Saves the summary table to a CSV file."""
filename = os.path.join(output_folder, f"{node_name}_sum_metrics_table.csv")
with open(filename, mode='w', newline='') as file:
with open(filename, mode="w", newline="") as file:
writer = csv.writer(file)
# Write header
metrics = list(next(iter(summary_table.values())).keys())
pmetrics = [metric[len(METRIX_PREFIX):] if metric.startswith(METRIX_PREFIX) else metric for metric in metrics]
header = ['containerid(podname)'] + pmetrics
pmetrics = [
metric[len(METRIX_PREFIX) :] if metric.startswith(METRIX_PREFIX) else metric for metric in metrics
]
header = ["containerid(podname)"] + pmetrics
writer.writerow(header)
# Write rows
for container_id_podname, metrics_values in summary_table.items():
Expand All @@ -183,49 +187,41 @@ def fetch_metrics_periodically(self, metric_names: list[str], namespace: str, in
print(f"metrics: {self.metrics}")

def start(self, metric_names: list[str], namespace: str, interval: int = 1):
"""
Starts the periodic fetching of metrics.
"""
"""Starts the periodic fetching of metrics."""
real_metrics = self.get_real_metrics(metric_names)
if self.thread is None or not self.thread.is_alive():
self.thread = Thread(target=self.fetch_metrics_periodically, args=(real_metrics, namespace, interval))
self.thread.start()
logger.info("MetricFetcher started.")

def stop(self):
"""
Stops the periodic fetching of metrics.
"""
"""Stops the periodic fetching of metrics."""
if self.thread is not None and self.thread.is_alive():
self.stop_event.set()
self.thread.join()
self.stop_event.clear()
logger.info("MetricFetcher stopped.")

def save_results_to_file(self, output_folder: str):
"""
Saves the calculated statistics to files.
"""
"""Saves the calculated statistics to files."""
self.save_raw_metrics(output_folder)
self.save_summary_table(output_folder)
logger.info(f"Results saved to: {output_folder}")


# Example usage
if __name__ == "__main__":
# Define the endpoint URL and result file path
metrics_endpoint = "http://172.21.195.64:9100/metrics" # Replace with your endpoint
result_file_path = "result.txt" # Replace with your desired result file path
metric_names=["cpu_utilization","memory","sum_local_memory_bandwidth","sum_total_memory_bandwidth"]
namespace="benchmarking"
metric_names = ["cpu_utilization", "memory", "sum_local_memory_bandwidth", "sum_total_memory_bandwidth"]
namespace = "benchmarking"
fetcher = MetricFetcher(metrics_endpoint)
# Start the MetricFetcher
fetcher.start(
metric_names,
namespace=namespace
)
fetcher.start(metric_names, namespace=namespace)
# Wait for some time
time.sleep(15)
# Stop the MetricFetcher
fetcher.stop()
# Save results to a file
fetcher.save_results_to_file(result_file_path)
fetcher.save_results_to_file(result_file_path)
3 changes: 1 addition & 2 deletions evals/benchmark/stresscli/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ flask
kubernetes
locust
numpy
prometheus_client
pytest
pyyaml
requests
prometheus_client
kubernetes
2 changes: 1 addition & 1 deletion evals/benchmark/stresscli/run.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ profile:
- "chatqna-tgi"
- "chatqna-teirerank"
utilization-metric-collect: False
utilization-metric-endpoint:
utilization-metric-endpoint:
- node-name: "node1"
endpoint: "http://10.233.102.173:9101/metrics"
- node-name: "node2"
Expand Down

0 comments on commit 1f21026

Please sign in to comment.