From 84e7d38d069fcc6395aedba1d375388580e07c7c Mon Sep 17 00:00:00 2001 From: Yugar-1 Date: Thu, 9 Jan 2025 07:04:23 +0000 Subject: [PATCH] Integrate with memory bandwidth exporter to support collection and reporting of memory bandwidth, cpu, mem metrics. Signed-off-by: Yugar-1 --- evals/benchmark/stresscli/README.md | 15 +- .../benchmark/stresscli/commands/load_test.py | 33 ++- evals/benchmark/stresscli/commands/report.py | 35 ++- .../stresscli/commands/utilization.py | 231 ++++++++++++++++++ evals/benchmark/stresscli/requirements.txt | 2 + evals/benchmark/stresscli/run.yaml | 12 + 6 files changed, 317 insertions(+), 11 deletions(-) create mode 100644 evals/benchmark/stresscli/commands/utilization.py diff --git a/evals/benchmark/stresscli/README.md b/evals/benchmark/stresscli/README.md index 2cc7fc98..736a6583 100644 --- a/evals/benchmark/stresscli/README.md +++ b/evals/benchmark/stresscli/README.md @@ -85,13 +85,14 @@ Usage: stresscli.py report [OPTIONS] Print the test report Options: - --folder PATH Path to log folder [required] - --format TEXT Output format, plain_text or csv, default is plain_text - --include TEXT Extract output data from output.log, stats.csv, and - testspec.yaml, default is - output.log|stats.csv|testspec.yaml - -o, --output PATH Save output to file - --help Show this message and exit. + --folder PATH Path to log folder [required] + --format TEXT Output format, plain_text, html or csv, default + is plain_text + --transformeddata BOOLEAN If transformedData is True, transpose the data to + have metrics as columns. + --profile PATH Path to profile YAML file + -o, --output PATH Save output to file + --help Show this message and exit. ``` #### Dump the configuration diff --git a/evals/benchmark/stresscli/commands/load_test.py b/evals/benchmark/stresscli/commands/load_test.py index ec24ac30..cbfbdd59 100644 --- a/evals/benchmark/stresscli/commands/load_test.py +++ b/evals/benchmark/stresscli/commands/load_test.py @@ -14,6 +14,7 @@ from .report import export_testdata from .utils import dump_k8s_config, generate_lua_script, generate_random_suffix +from .utilization import MetricFetcher # Default load shape DEFAULT_LOADSHAPE = "constant" @@ -180,8 +181,6 @@ def run_locust_test(kubeconfig, global_settings, run_settings, output_folder, in ) exit() - # csv_output = os.path.join(output_folder, runspec['run_name']) - # json_output = os.path.join(output_folder, f"{runspec['run_name']}_output.log") csv_output = os.path.join(output_folder, f"{index}") json_output = os.path.join(output_folder, f"{index}_output.log") @@ -283,10 +282,26 @@ def run_locust_test(kubeconfig, global_settings, run_settings, output_folder, in collector = DockerMetricsCollector() collect_metrics(collector, services, start_output_folder) + utilization_metric = global_settings.get('utilization-metric-collect', False) + if utilization_metric: + services = global_settings.get('service-list') or [] + metrics_endpoints = global_settings.get('utilization-metric-endpoint') or [] + metrics_port = global_settings.get('utilization-metric-port', 9100) + metric_names = global_settings.get('utilization-metric-names') or [] + fetcher = MetricFetcher(metrics_endpoints, metrics_port) + print(f"before start_collect_utilization:") + start_collect_utilization(fetcher,metric_names,namespace) + runspec["starttest_time"] = datetime.now().isoformat() result = subprocess.run(cmd, capture_output=True, text=True) runspec["endtest_time"] = datetime.now().isoformat() + if utilization_metric: + util_output_folder = os.path.join(output_folder, f"{index}_utilization") + print(f"before stop_collect_utilization: {util_output_folder}") + os.makedirs(util_output_folder, exist_ok=True) + stop_collect_utilization(fetcher,util_output_folder,metric_names) + if service_metric: from .metrics_util import export_metric @@ -305,6 +320,20 @@ def run_locust_test(kubeconfig, global_settings, run_settings, output_folder, in if runspec["deployment-type"] == "k8s": dump_test_spec(kubeconfig, runspec, runspec["namespace"], output_folder, index) +def start_collect_utilization(fetcher,metric_names,namespace): + # Start the MetricFetcher + fetcher.start( + metric_names, + namespace=namespace, + ) + +def stop_collect_utilization(fetcher,output_folder,metric_names): + # Stop the MetricFetcher + fetcher.stop() + # Save results to a file + print(f"Calculated Average and Max per Container:") + fetcher.save_results_to_file(output_folder) + print(f"after save_results_to_folder {output_folder}") def dump_test_spec(kubeconfig, run, namespace, output_folder, index): # Dump the k8s spec diff --git a/evals/benchmark/stresscli/commands/report.py b/evals/benchmark/stresscli/commands/report.py index ae334aaa..8d22c2ab 100644 --- a/evals/benchmark/stresscli/commands/report.py +++ b/evals/benchmark/stresscli/commands/report.py @@ -17,24 +17,35 @@ CSV_SECTION_NAME = "stats.csv" TESTSPEC_SECTION_NAME = "testspec.yaml" METRICS_SECTION_NAME = "metrics.json" +UTILIZATION_SECTION_NAME = 'utilization.json' @click.command() @click.option("--folder", type=click.Path(), required=True, help="Path to log folder") @click.option("--format", default="plain_text", help="Output format, plain_text or csv, default is plain_text") # @click.option('--include', default='testspec.yaml', help='Extract output data from output.log, stats.csv, and testspec.yaml, default is testspec.yaml') +@click.option('--transformeddata', type=bool, default=False, help='If transformedData is True, transpose the data to have metrics as columns.') @click.option("-o", "--output", type=click.Path(), help="Save output to file") @click.pass_context -def report(ctx, folder, format, output): +def report(ctx, folder, format, output, transformeddata): """Print the test report.""" output_data = {} testcases = get_testcases(folder) for testcase in testcases: - include = "|".join([TESTSPEC_SECTION_NAME, CSV_SECTION_NAME, METRICS_SECTION_NAME]) + include = "|".join([TESTSPEC_SECTION_NAME, CSV_SECTION_NAME, METRICS_SECTION_NAME, UTILIZATION_SECTION_NAME]) extracted_data = export_testdata(testcase, folder, include) if extracted_data: output_data[testcase] = extracted_data + if transformeddata: + transformed_output_data = {} + for key, value in output_data.items(): + for k, v in value.items(): + if k not in transformed_output_data: + transformed_output_data[k] = {} + transformed_output_data[k][key] = v + output_data = transformed_output_data + if format == "plain_text": if output: with open(output, "w") as f: @@ -106,6 +117,7 @@ def export_testdata(testcase, folder, include="output.log|stats.csv|testspec.yam csv_path = os.path.join(folder, f"{testcase}_stats.csv") testspec_path = os.path.join(folder, f"{testcase}_testspec.yaml") metrics_path = os.path.join(folder, f"{testcase}_metrics.json") + utilization_path=os.path.join(folder, f'{testcase}_utilization.json') extracted_data = {} if os.path.exists(csv_path): if TESTSPEC_SECTION_NAME in include: @@ -118,6 +130,8 @@ def export_testdata(testcase, folder, include="output.log|stats.csv|testspec.yam extract_stdout(extracted_data, log_data) if METRICS_SECTION_NAME in include and os.path.exists(metrics_path): extract_json(extracted_data, metrics_path) + if UTILIZATION_SECTION_NAME in include and os.path.exists(utilization_path): + extract_utilization_json(extracted_data,utilization_path) else: print("Test failure, no data") return extracted_data @@ -179,6 +193,23 @@ def extract_json(extracted_data, json_file): except Exception as e: print(f"An error occurred: {e}") +def extract_utilization_json(extracted_data, json_file): + try: + with open(json_file, 'r') as file: + data = json.load(file) + + deployment_metrics = data.get("deployment_metrics", {}) + + for key, value in deployment_metrics.items(): +# print(f"Key: {key}, Value: {value}") + extracted_data[key] = value + + except json.JSONDecodeError as e: + print(f"Error decoding JSON: {e}") + except FileNotFoundError: + print("The specified file was not found.") + except Exception as e: + print(f"An error occurred: {e}") # Function to extract information based on keywords and patterns def extract_stdout(extracted_data, log): diff --git a/evals/benchmark/stresscli/commands/utilization.py b/evals/benchmark/stresscli/commands/utilization.py new file mode 100644 index 00000000..d8c29e86 --- /dev/null +++ b/evals/benchmark/stresscli/commands/utilization.py @@ -0,0 +1,231 @@ +import re +import os +import time +import requests +import logging +import json +import csv +import concurrent.futures +from prometheus_client.parser import text_string_to_metric_families +from collections import defaultdict +from threading import Thread, Event +from kubernetes import client, config +# Configure CONSTANTS Value +METRIX_PREFIX="rdt_container_" +# Configure logging +log_level = 'ERROR' # Default log level +logging.basicConfig(level=getattr(logging, log_level)) +logger = logging.getLogger(__name__) + +class MetricFetcher: + def __init__(self, endpoints, port: int): + self.endpoints = endpoints + self.port = port + self.responses = defaultdict(list) + self.metrics = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) + self.container_pod = defaultdict(str) + self.stop_event = Event() + self.thread = None + + def get_real_metrics(self, metric_names: list[str]): + # 遍历metric_names 将数组中每一个metric_names添加一个METRIX_PREFIX + metric_names = [f"{METRIX_PREFIX}{metric_name}" for metric_name in metric_names] + return metric_names + + def get_all_endpints(self): + config.load_kube_config() + v1 = client.CoreV1Api() + pods = v1.list_pod_for_all_namespaces(watch=False) + self.endpoints = [ + { + "node-name": pod.spec.node_name, + "endpoint": f"http://{pod.status.pod_ip}:{self.port}/metrics", + } + for pod in pods.items if "memory-bandwidth-exporter" in pod.metadata.name + ] + + def fetch_metrics(self, metric_names: list[str], namespace: str): + """ + Fetches metrics from the specified URL, filters by metric name and namespace, and stores them in the class. + """ + if not self.endpoints: + self.get_all_endpints() + start_time = time.time() # Start timer for fetching metrics + def fetch_endpoint(endpoint): + response = requests.get(endpoint["endpoint"]) + response.raise_for_status() + return endpoint["node-name"], response.text + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = {executor.submit(fetch_endpoint, endpoint): endpoint for endpoint in self.endpoints} + for future in concurrent.futures.as_completed(futures): + endpoint = futures[future] + try: + node_name, response_text = future.result() + self.responses[node_name].append(response_text) + except Exception as e: + logger.error(f"Error fetching metrics from {endpoint['endpoint']}: {e}") + fetch_duration = time.time() - start_time # Calculate duration + logger.debug(f"Time taken to fetch metrics: {fetch_duration:.2f} seconds") + + def parse_metrics(self, metric_names: list[str], namespace: str): + """ + Parses metrics from the stored responses and stores them in the class. + """ + start_time = time.time() + for node_name, metrics_data_list in self.responses.items(): + for metrics_data in metrics_data_list: + for family in text_string_to_metric_families(metrics_data): + for sample in family.samples: + metric_name = sample[0] + labels = sample[1] + value = sample[2] + + # Check if the metric name and namespace match + if metric_name in metric_names and labels.get("nameSpace") == namespace: + container_id = labels.get("containerId") + pod_name = labels.get("podName") + if container_id not in self.metrics[node_name][metric_name]: + self.metrics[node_name][metric_name][container_id] = [] + self.metrics[node_name][metric_name][container_id].append(value) + self.container_pod[container_id] = pod_name + parse_duration = time.time() - start_time # Calculate duration + logger.debug(f"Time taken to parse metrics: {parse_duration:.2f} seconds") + + def save_raw_metrics(self, output_folder: str): + """ + Saves the metrics to CSV files with node_name and metrics as granularity. + """ + for node_name, metrics in self.metrics.items(): + for metric_name, containers in metrics.items(): + pmetric_name = metric_name + if pmetric_name.startswith(METRIX_PREFIX): + pmetric_name = pmetric_name[len(METRIX_PREFIX):] + filename = os.path.join(output_folder, f"{node_name}_{pmetric_name}.csv") + with open(filename, mode='w', newline='') as file: + writer = csv.writer(file) + # Write header + header = ['container_id'] + [f"collect_num_{i}" for i in range(len(next(iter(containers.values()))))] + writer.writerow(header) + # Write rows + for container_id, values in containers.items(): + row = [container_id] + values + writer.writerow(row) + # Write sum row + sum_row = ['sum'] + [sum(values[i] for values in containers.values()) for i in range(len(next(iter(containers.values()))))] + writer.writerow(sum_row) + logger.info(f"Metrics saved to {filename}") + + def save_summary_table(self, output_folder: str): + """ + Creates a summary table with container_id_podname as rows and metrics as columns. + """ + for node_name, metrics in self.metrics.items(): + summary_table = defaultdict(dict) + for metric_name, containers in metrics.items(): + pmetric_name = metric_name + if pmetric_name.startswith(METRIX_PREFIX): + pmetric_name = pmetric_name[len(METRIX_PREFIX):] + filename = os.path.join(output_folder, f"{node_name}_{pmetric_name}.csv") + max_sum_index = self.get_max_sum_column_from_csv(filename) + with open(filename, mode='r') as file: + reader = csv.reader(file) + header = next(reader) # Skip header + for row in reader: + if row[0] != 'sum': + container_id = row[0] + pod_name = self.container_pod.get(container_id, "Unknown") + container_id_podname = f"{container_id}({pod_name})" + summary_table[container_id_podname][metric_name] = float(row[max_sum_index + 1]) + self.save_summary_table_to_csv(summary_table, output_folder, node_name) + + def get_max_sum_column_from_csv(self, filename: str) -> int: + """ + Reads a CSV file and returns the index of the column with the maximum sum value. + """ + with open(filename, mode='r') as file: + reader = csv.reader(file) + header = next(reader) # Skip header + sum_row = None + for row in reader: + if row[0] == 'sum': + sum_row = row[1:] + break + if sum_row is None: + raise ValueError(f"No sum row found in {filename}") + max_sum_index = max(range(len(sum_row)), key=lambda i: float(sum_row[i])) + return max_sum_index + + def save_summary_table_to_csv(self, summary_table: dict[str, dict[str, float]], output_folder: str, node_name: str): + """ + Saves the summary table to a CSV file. + """ + filename = os.path.join(output_folder, f"{node_name}_sum_metrics_table.csv") + with open(filename, mode='w', newline='') as file: + writer = csv.writer(file) + # Write header + metrics = list(next(iter(summary_table.values())).keys()) + pmetrics = [metric[len(METRIX_PREFIX):] if metric.startswith(METRIX_PREFIX) else metric for metric in metrics] + header = ['containerid(podname)'] + pmetrics + writer.writerow(header) + # Write rows + for container_id_podname, metrics_values in summary_table.items(): + row = [container_id_podname] + [metrics_values[metric] for metric in metrics] + writer.writerow(row) + + def fetch_metrics_periodically(self, metric_names: list[str], namespace: str, interval: int): + while not self.stop_event.is_set(): + self.fetch_metrics(metric_names, namespace) + self.stop_event.wait(interval) + for node, values in self.responses.items(): + length = len(values) + print(f"node name: {node}, length of response: {length}") + self.parse_metrics(metric_names, namespace) + print(f"metrics: {self.metrics}") + + def start(self, metric_names: list[str], namespace: str, interval: int = 1): + """ + Starts the periodic fetching of metrics. + """ + real_metrics = self.get_real_metrics(metric_names) + if self.thread is None or not self.thread.is_alive(): + self.thread = Thread(target=self.fetch_metrics_periodically, args=(real_metrics, namespace, interval)) + self.thread.start() + logger.info("MetricFetcher started.") + + def stop(self): + """ + Stops the periodic fetching of metrics. + """ + if self.thread is not None and self.thread.is_alive(): + self.stop_event.set() + self.thread.join() + self.stop_event.clear() + logger.info("MetricFetcher stopped.") + + def save_results_to_file(self, output_folder: str): + """ + Saves the calculated statistics to files. + """ + self.save_raw_metrics(output_folder) + self.save_summary_table(output_folder) + logger.info(f"Results saved to: {output_folder}") + +# Example usage +if __name__ == "__main__": + # Define the endpoint URL and result file path + metrics_endpoint = "http://172.21.195.64:9100/metrics" # Replace with your endpoint + result_file_path = "result.txt" # Replace with your desired result file path + metric_names=["cpu_utilization","memory","sum_local_memory_bandwidth","sum_total_memory_bandwidth"] + namespace="benchmarking" + fetcher = MetricFetcher(metrics_endpoint) + # Start the MetricFetcher + fetcher.start( + metric_names, + namespace=namespace + ) + # Wait for some time + time.sleep(15) + # Stop the MetricFetcher + fetcher.stop() + # Save results to a file + fetcher.save_results_to_file(result_file_path) \ No newline at end of file diff --git a/evals/benchmark/stresscli/requirements.txt b/evals/benchmark/stresscli/requirements.txt index ab52c2f8..3ad49cdb 100644 --- a/evals/benchmark/stresscli/requirements.txt +++ b/evals/benchmark/stresscli/requirements.txt @@ -8,3 +8,5 @@ numpy pytest pyyaml requests +prometheus_client +kubernetes \ No newline at end of file diff --git a/evals/benchmark/stresscli/run.yaml b/evals/benchmark/stresscli/run.yaml index e5f8873d..7d2fa295 100644 --- a/evals/benchmark/stresscli/run.yaml +++ b/evals/benchmark/stresscli/run.yaml @@ -20,6 +20,18 @@ profile: - "chatqna-tei" - "chatqna-tgi" - "chatqna-teirerank" + utilization-metric-collect: False + utilization-metric-endpoint: + - node-name: "node1" + endpoint: "http://10.233.102.173:9101/metrics" + - node-name: "node2" + endpoint: "http://10.233.75.49:9101/metrics" + utilization-metric-port: 9101 + utilization-metric-names: + - "cpu_utilization" + - "memory" + - "sum_local_memory_bandwidth" + - "sum_total_memory_bandwidth" runs: - name: sample users: 2