Skip to content

Commit

Permalink
Integrate with memory bandwidth exporter to support collection and re…
Browse files Browse the repository at this point in the history
…porting of memory bandwidth, cpu, mem metrics.

Signed-off-by: Yugar-1 <[email protected]>
  • Loading branch information
Yugar-1 committed Jan 9, 2025
1 parent b0ed6fc commit 84e7d38
Show file tree
Hide file tree
Showing 6 changed files with 317 additions and 11 deletions.
15 changes: 8 additions & 7 deletions evals/benchmark/stresscli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@ Usage: stresscli.py report [OPTIONS]
Print the test report
Options:
--folder PATH Path to log folder [required]
--format TEXT Output format, plain_text or csv, default is plain_text
--include TEXT Extract output data from output.log, stats.csv, and
testspec.yaml, default is
output.log|stats.csv|testspec.yaml
-o, --output PATH Save output to file
--help Show this message and exit.
--folder PATH Path to log folder [required]
--format TEXT Output format, plain_text, html or csv, default
is plain_text
--transformeddata BOOLEAN If transformedData is True, transpose the data to
have metrics as columns.
--profile PATH Path to profile YAML file
-o, --output PATH Save output to file
--help Show this message and exit.
```
#### Dump the configuration

Expand Down
33 changes: 31 additions & 2 deletions evals/benchmark/stresscli/commands/load_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from .report import export_testdata
from .utils import dump_k8s_config, generate_lua_script, generate_random_suffix
from .utilization import MetricFetcher

# Default load shape
DEFAULT_LOADSHAPE = "constant"
Expand Down Expand Up @@ -180,8 +181,6 @@ def run_locust_test(kubeconfig, global_settings, run_settings, output_folder, in
)
exit()

# csv_output = os.path.join(output_folder, runspec['run_name'])
# json_output = os.path.join(output_folder, f"{runspec['run_name']}_output.log")
csv_output = os.path.join(output_folder, f"{index}")
json_output = os.path.join(output_folder, f"{index}_output.log")

Expand Down Expand Up @@ -283,10 +282,26 @@ def run_locust_test(kubeconfig, global_settings, run_settings, output_folder, in
collector = DockerMetricsCollector()
collect_metrics(collector, services, start_output_folder)

utilization_metric = global_settings.get('utilization-metric-collect', False)
if utilization_metric:
services = global_settings.get('service-list') or []
metrics_endpoints = global_settings.get('utilization-metric-endpoint') or []
metrics_port = global_settings.get('utilization-metric-port', 9100)
metric_names = global_settings.get('utilization-metric-names') or []
fetcher = MetricFetcher(metrics_endpoints, metrics_port)
print(f"before start_collect_utilization:")
start_collect_utilization(fetcher,metric_names,namespace)

runspec["starttest_time"] = datetime.now().isoformat()
result = subprocess.run(cmd, capture_output=True, text=True)
runspec["endtest_time"] = datetime.now().isoformat()

if utilization_metric:
util_output_folder = os.path.join(output_folder, f"{index}_utilization")
print(f"before stop_collect_utilization: {util_output_folder}")
os.makedirs(util_output_folder, exist_ok=True)
stop_collect_utilization(fetcher,util_output_folder,metric_names)

if service_metric:
from .metrics_util import export_metric

Expand All @@ -305,6 +320,20 @@ def run_locust_test(kubeconfig, global_settings, run_settings, output_folder, in
if runspec["deployment-type"] == "k8s":
dump_test_spec(kubeconfig, runspec, runspec["namespace"], output_folder, index)

def start_collect_utilization(fetcher,metric_names,namespace):
# Start the MetricFetcher
fetcher.start(
metric_names,
namespace=namespace,
)

def stop_collect_utilization(fetcher,output_folder,metric_names):
# Stop the MetricFetcher
fetcher.stop()
# Save results to a file
print(f"Calculated Average and Max per Container:")
fetcher.save_results_to_file(output_folder)
print(f"after save_results_to_folder {output_folder}")

def dump_test_spec(kubeconfig, run, namespace, output_folder, index):
# Dump the k8s spec
Expand Down
35 changes: 33 additions & 2 deletions evals/benchmark/stresscli/commands/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,35 @@
CSV_SECTION_NAME = "stats.csv"
TESTSPEC_SECTION_NAME = "testspec.yaml"
METRICS_SECTION_NAME = "metrics.json"
UTILIZATION_SECTION_NAME = 'utilization.json'


@click.command()
@click.option("--folder", type=click.Path(), required=True, help="Path to log folder")
@click.option("--format", default="plain_text", help="Output format, plain_text or csv, default is plain_text")
# @click.option('--include', default='testspec.yaml', help='Extract output data from output.log, stats.csv, and testspec.yaml, default is testspec.yaml')
@click.option('--transformeddata', type=bool, default=False, help='If transformedData is True, transpose the data to have metrics as columns.')
@click.option("-o", "--output", type=click.Path(), help="Save output to file")
@click.pass_context
def report(ctx, folder, format, output):
def report(ctx, folder, format, output, transformeddata):
"""Print the test report."""
output_data = {}
testcases = get_testcases(folder)
for testcase in testcases:
include = "|".join([TESTSPEC_SECTION_NAME, CSV_SECTION_NAME, METRICS_SECTION_NAME])
include = "|".join([TESTSPEC_SECTION_NAME, CSV_SECTION_NAME, METRICS_SECTION_NAME, UTILIZATION_SECTION_NAME])
extracted_data = export_testdata(testcase, folder, include)
if extracted_data:
output_data[testcase] = extracted_data

if transformeddata:
transformed_output_data = {}
for key, value in output_data.items():
for k, v in value.items():
if k not in transformed_output_data:
transformed_output_data[k] = {}
transformed_output_data[k][key] = v
output_data = transformed_output_data

if format == "plain_text":
if output:
with open(output, "w") as f:
Expand Down Expand Up @@ -106,6 +117,7 @@ def export_testdata(testcase, folder, include="output.log|stats.csv|testspec.yam
csv_path = os.path.join(folder, f"{testcase}_stats.csv")
testspec_path = os.path.join(folder, f"{testcase}_testspec.yaml")
metrics_path = os.path.join(folder, f"{testcase}_metrics.json")
utilization_path=os.path.join(folder, f'{testcase}_utilization.json')
extracted_data = {}
if os.path.exists(csv_path):
if TESTSPEC_SECTION_NAME in include:
Expand All @@ -118,6 +130,8 @@ def export_testdata(testcase, folder, include="output.log|stats.csv|testspec.yam
extract_stdout(extracted_data, log_data)
if METRICS_SECTION_NAME in include and os.path.exists(metrics_path):
extract_json(extracted_data, metrics_path)
if UTILIZATION_SECTION_NAME in include and os.path.exists(utilization_path):
extract_utilization_json(extracted_data,utilization_path)
else:
print("Test failure, no data")
return extracted_data
Expand Down Expand Up @@ -179,6 +193,23 @@ def extract_json(extracted_data, json_file):
except Exception as e:
print(f"An error occurred: {e}")

def extract_utilization_json(extracted_data, json_file):
try:
with open(json_file, 'r') as file:
data = json.load(file)

deployment_metrics = data.get("deployment_metrics", {})

for key, value in deployment_metrics.items():
# print(f"Key: {key}, Value: {value}")
extracted_data[key] = value

except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
except FileNotFoundError:
print("The specified file was not found.")
except Exception as e:
print(f"An error occurred: {e}")

# Function to extract information based on keywords and patterns
def extract_stdout(extracted_data, log):
Expand Down
231 changes: 231 additions & 0 deletions evals/benchmark/stresscli/commands/utilization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
import re
import os
import time
import requests
import logging
import json
import csv
import concurrent.futures
from prometheus_client.parser import text_string_to_metric_families
from collections import defaultdict
from threading import Thread, Event
from kubernetes import client, config
# Configure CONSTANTS Value
METRIX_PREFIX="rdt_container_"
# Configure logging
log_level = 'ERROR' # Default log level
logging.basicConfig(level=getattr(logging, log_level))
logger = logging.getLogger(__name__)

class MetricFetcher:
def __init__(self, endpoints, port: int):
self.endpoints = endpoints
self.port = port
self.responses = defaultdict(list)
self.metrics = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
self.container_pod = defaultdict(str)
self.stop_event = Event()
self.thread = None

def get_real_metrics(self, metric_names: list[str]):
# 遍历metric_names 将数组中每一个metric_names添加一个METRIX_PREFIX
metric_names = [f"{METRIX_PREFIX}{metric_name}" for metric_name in metric_names]
return metric_names

def get_all_endpints(self):
config.load_kube_config()
v1 = client.CoreV1Api()
pods = v1.list_pod_for_all_namespaces(watch=False)
self.endpoints = [
{
"node-name": pod.spec.node_name,
"endpoint": f"http://{pod.status.pod_ip}:{self.port}/metrics",
}
for pod in pods.items if "memory-bandwidth-exporter" in pod.metadata.name
]

def fetch_metrics(self, metric_names: list[str], namespace: str):
"""
Fetches metrics from the specified URL, filters by metric name and namespace, and stores them in the class.
"""
if not self.endpoints:
self.get_all_endpints()
start_time = time.time() # Start timer for fetching metrics
def fetch_endpoint(endpoint):
response = requests.get(endpoint["endpoint"])
response.raise_for_status()
return endpoint["node-name"], response.text
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(fetch_endpoint, endpoint): endpoint for endpoint in self.endpoints}
for future in concurrent.futures.as_completed(futures):
endpoint = futures[future]
try:
node_name, response_text = future.result()
self.responses[node_name].append(response_text)
except Exception as e:
logger.error(f"Error fetching metrics from {endpoint['endpoint']}: {e}")
fetch_duration = time.time() - start_time # Calculate duration
logger.debug(f"Time taken to fetch metrics: {fetch_duration:.2f} seconds")

def parse_metrics(self, metric_names: list[str], namespace: str):
"""
Parses metrics from the stored responses and stores them in the class.
"""
start_time = time.time()
for node_name, metrics_data_list in self.responses.items():
for metrics_data in metrics_data_list:
for family in text_string_to_metric_families(metrics_data):
for sample in family.samples:
metric_name = sample[0]
labels = sample[1]
value = sample[2]

# Check if the metric name and namespace match
if metric_name in metric_names and labels.get("nameSpace") == namespace:
container_id = labels.get("containerId")
pod_name = labels.get("podName")
if container_id not in self.metrics[node_name][metric_name]:
self.metrics[node_name][metric_name][container_id] = []
self.metrics[node_name][metric_name][container_id].append(value)
self.container_pod[container_id] = pod_name
parse_duration = time.time() - start_time # Calculate duration
logger.debug(f"Time taken to parse metrics: {parse_duration:.2f} seconds")

def save_raw_metrics(self, output_folder: str):
"""
Saves the metrics to CSV files with node_name and metrics as granularity.
"""
for node_name, metrics in self.metrics.items():
for metric_name, containers in metrics.items():
pmetric_name = metric_name
if pmetric_name.startswith(METRIX_PREFIX):
pmetric_name = pmetric_name[len(METRIX_PREFIX):]
filename = os.path.join(output_folder, f"{node_name}_{pmetric_name}.csv")
with open(filename, mode='w', newline='') as file:
writer = csv.writer(file)
# Write header
header = ['container_id'] + [f"collect_num_{i}" for i in range(len(next(iter(containers.values()))))]
writer.writerow(header)
# Write rows
for container_id, values in containers.items():
row = [container_id] + values
writer.writerow(row)
# Write sum row
sum_row = ['sum'] + [sum(values[i] for values in containers.values()) for i in range(len(next(iter(containers.values()))))]
writer.writerow(sum_row)
logger.info(f"Metrics saved to {filename}")

def save_summary_table(self, output_folder: str):
"""
Creates a summary table with container_id_podname as rows and metrics as columns.
"""
for node_name, metrics in self.metrics.items():
summary_table = defaultdict(dict)
for metric_name, containers in metrics.items():
pmetric_name = metric_name
if pmetric_name.startswith(METRIX_PREFIX):
pmetric_name = pmetric_name[len(METRIX_PREFIX):]
filename = os.path.join(output_folder, f"{node_name}_{pmetric_name}.csv")
max_sum_index = self.get_max_sum_column_from_csv(filename)
with open(filename, mode='r') as file:
reader = csv.reader(file)
header = next(reader) # Skip header
for row in reader:
if row[0] != 'sum':
container_id = row[0]
pod_name = self.container_pod.get(container_id, "Unknown")
container_id_podname = f"{container_id}({pod_name})"
summary_table[container_id_podname][metric_name] = float(row[max_sum_index + 1])
self.save_summary_table_to_csv(summary_table, output_folder, node_name)

def get_max_sum_column_from_csv(self, filename: str) -> int:
"""
Reads a CSV file and returns the index of the column with the maximum sum value.
"""
with open(filename, mode='r') as file:
reader = csv.reader(file)
header = next(reader) # Skip header
sum_row = None
for row in reader:
if row[0] == 'sum':
sum_row = row[1:]
break
if sum_row is None:
raise ValueError(f"No sum row found in {filename}")
max_sum_index = max(range(len(sum_row)), key=lambda i: float(sum_row[i]))
return max_sum_index

def save_summary_table_to_csv(self, summary_table: dict[str, dict[str, float]], output_folder: str, node_name: str):
"""
Saves the summary table to a CSV file.
"""
filename = os.path.join(output_folder, f"{node_name}_sum_metrics_table.csv")
with open(filename, mode='w', newline='') as file:
writer = csv.writer(file)
# Write header
metrics = list(next(iter(summary_table.values())).keys())
pmetrics = [metric[len(METRIX_PREFIX):] if metric.startswith(METRIX_PREFIX) else metric for metric in metrics]
header = ['containerid(podname)'] + pmetrics
writer.writerow(header)
# Write rows
for container_id_podname, metrics_values in summary_table.items():
row = [container_id_podname] + [metrics_values[metric] for metric in metrics]
writer.writerow(row)

def fetch_metrics_periodically(self, metric_names: list[str], namespace: str, interval: int):
while not self.stop_event.is_set():
self.fetch_metrics(metric_names, namespace)
self.stop_event.wait(interval)
for node, values in self.responses.items():
length = len(values)
print(f"node name: {node}, length of response: {length}")
self.parse_metrics(metric_names, namespace)
print(f"metrics: {self.metrics}")

def start(self, metric_names: list[str], namespace: str, interval: int = 1):
"""
Starts the periodic fetching of metrics.
"""
real_metrics = self.get_real_metrics(metric_names)
if self.thread is None or not self.thread.is_alive():
self.thread = Thread(target=self.fetch_metrics_periodically, args=(real_metrics, namespace, interval))
self.thread.start()
logger.info("MetricFetcher started.")

def stop(self):
"""
Stops the periodic fetching of metrics.
"""
if self.thread is not None and self.thread.is_alive():
self.stop_event.set()
self.thread.join()
self.stop_event.clear()
logger.info("MetricFetcher stopped.")

def save_results_to_file(self, output_folder: str):
"""
Saves the calculated statistics to files.
"""
self.save_raw_metrics(output_folder)
self.save_summary_table(output_folder)
logger.info(f"Results saved to: {output_folder}")

# Example usage
if __name__ == "__main__":
# Define the endpoint URL and result file path
metrics_endpoint = "http://172.21.195.64:9100/metrics" # Replace with your endpoint
result_file_path = "result.txt" # Replace with your desired result file path
metric_names=["cpu_utilization","memory","sum_local_memory_bandwidth","sum_total_memory_bandwidth"]
namespace="benchmarking"
fetcher = MetricFetcher(metrics_endpoint)
# Start the MetricFetcher
fetcher.start(
metric_names,
namespace=namespace
)
# Wait for some time
time.sleep(15)
# Stop the MetricFetcher
fetcher.stop()
# Save results to a file
fetcher.save_results_to_file(result_file_path)
Loading

0 comments on commit 84e7d38

Please sign in to comment.