From bf9cff12a31ad924026e39a9d0db4eb69b76a738 Mon Sep 17 00:00:00 2001 From: Sheharyar Ahmad Date: Tue, 17 Dec 2024 23:31:02 +0500 Subject: [PATCH 1/7] - Added a single run file for all algorithms - Updated config.json file structure according to run.py requirements. - Added queries.json file in root directory --- config.json | 43 ++++++ queries.json | 46 ++++++ run.py | 409 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 498 insertions(+) create mode 100644 config.json create mode 100644 queries.json create mode 100644 run.py diff --git a/config.json b/config.json new file mode 100644 index 00000000..a4cd7e7c --- /dev/null +++ b/config.json @@ -0,0 +1,43 @@ +{ + "database": { + "host": "localhost", + "username": "postgres", + "password": "admin123", + "db-name": "ann" + }, + "benchmark-info": { + "name": "ann-benchmark", + "instance-size": "db.m6i.large", + "instance-service": "RDS", + "provider": "aws", + "description": "This is a benchmark for ANN search" + }, + "cases": [ + { + "db-label": "run1-seqon-new", + "vdb-command": "pgvectorhnsw", + "vector-ext": "pgvector", + "index-type": "hnsw", + "case-type": "Performance1536D500K", + "drop-old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "index-params": { + "m": 16, + "ef-construction": 64, + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2 + }, + "search-params": { + "ef-search": [10, 20], + "query-rescore": [100, 200, 300, 500], + "test": 10 + }, + "num-concurrency": "1,5", + "concurrency-duration": 30, + "k": 10, + "run-count": 1 + } + ] +} diff --git a/queries.json b/queries.json new file mode 100644 index 00000000..dad75817 --- /dev/null +++ b/queries.json @@ -0,0 +1,46 @@ +[ + { + "description": "Postgresql Version", + "query": "SELECT version();" + }, + { + "description": "List of extensions installed on postgersql server", + "query": "SELECT extname AS name, extversion AS version FROM pg_extension ORDER BY extname;" + }, + { + "description": "Hit Ratio, Reads, Writes from pg_stat_io", + "query": "SELECT (hits / (reads + hits)::float) AS hit_ratio, reads, writes FROM pg_stat_io WHERE backend_type = 'client backend' AND context = 'normal' AND object = 'relation';" + }, + { + "description": "Buffer Usage from pg_buffercache", + "query": "WITH state AS (SELECT count(*) FILTER (WHERE relfilenode IS NOT NULL) AS used, count(*) FILTER (WHERE relfilenode IS NULL) AS empty, count(*) AS total FROM pg_buffercache) SELECT *, round(used * 1.0 / total * 100, 1) AS percent FROM state;" + }, + { + "description": "Hit Ratio for Tables from pg_statio_user_tables", + "query": "SELECT SUM(heap_blks_read) as heap_read, SUM(heap_blks_hit) as heap_hit, SUM(heap_blks_hit) / (SUM(heap_blks_hit) + SUM(heap_blks_read)) as hit_ratio FROM pg_statio_user_tables;" + }, + { + "description": "Hit Ratio for Indexes from pg_statio_user_indexes", + "query": "SELECT SUM(idx_blks_read) as idx_read, SUM(idx_blks_hit) as idx_hit, (SUM(idx_blks_hit) - SUM(idx_blks_read)) / SUM(idx_blks_hit) as ratio FROM pg_statio_user_indexes;" + }, + { + "description": "Index Hit Ratio with Table and Index Names", + "query": "SELECT t.schemaname, t.relname as \"Table Name\", io_i.indexrelname as \"Index Name\", CASE WHEN (io_i.idx_blks_hit <> 0 OR io_i.idx_blks_read <> 0) THEN round(io_i.idx_blks_hit / (io_i.idx_blks_hit::numeric + io_i.idx_blks_read::numeric), 4) ELSE null END as \"Index Hit Ratio\" FROM pg_stat_user_tables t JOIN pg_statio_user_indexes io_i ON io_i.relid = t.relid ORDER BY \"Index Hit Ratio\" DESC;" + }, + { + "description": "Buffer Cache Usage (Top 10 Relations)", + "query": "SELECT c.relname, count(*) AS buffers FROM pg_buffercache b INNER JOIN pg_class c ON b.relfilenode = pg_relation_filenode(c.oid) AND b.reldatabase IN (0, (SELECT oid FROM pg_database WHERE datname = current_database())) GROUP BY c.relname ORDER BY 2 DESC LIMIT 10;" + }, + { + "description": "Top 10 Relations Residing in Memory", + "query": "SELECT c.relname, pg_size_pretty(count(*)*8192) AS buffer_size, pg_size_pretty(pg_relation_size(c.oid)) as relation_size, Round(100.0 * Count(*) / (SELECT setting FROM pg_settings WHERE name = 'shared_buffers') :: INTEGER, 2) AS buffers_percent, ROUND(count(*)*8192*100/ pg_relation_size(c.oid)::numeric, 2 ) AS relation_percent, CASE WHEN c.relkind = 'r' THEN 'table' WHEN c.relkind = 'i' THEN 'index' WHEN c.relkind = 'S' THEN 'sequence' WHEN c.relkind = 't' THEN 'TOAST table' WHEN c.relkind = 'v' THEN 'view' WHEN c.relkind = 'm' THEN 'materialized view' WHEN c.relkind = 'c' THEN 'composite type' WHEN c.relkind = 'f' THEN 'foreign table' WHEN c.relkind = 'p' THEN 'partitioned table' WHEN c.relkind = 'I' THEN 'partitioned index' ELSE 'Unexpected relkind' END as relation_type FROM pg_class c INNER JOIN pg_buffercache b ON b.relfilenode = c.relfilenode INNER JOIN pg_database d ON ( b.reldatabase = d.oid AND d.datname = Current_database() ) GROUP BY c.relname, c.oid ORDER BY pg_total_relation_size(c.oid) DESC LIMIT 10;" + }, + { + "description": "How many time the index is used?", + "query": "SELECT relname, 100 * idx_scan / (seq_scan + idx_scan) percent_of_times_index_used, n_live_tup rows_in_table FROM pg_stat_user_tables WHERE seq_scan + idx_scan > 0 ORDER BY n_live_tup DESC;" + }, + { + "description": "Reset Statistics", + "query": "SELECT pg_stat_reset();" + } +] diff --git a/run.py b/run.py new file mode 100644 index 00000000..6a690e64 --- /dev/null +++ b/run.py @@ -0,0 +1,409 @@ +import argparse +import json +import time +import subprocess +from typing import List +import psycopg2 +import os +import logging +from psycopg2 import sql +from contextlib import redirect_stdout +from itertools import product + +logger = logging.getLogger(__file__) +logger.setLevel(logging.DEBUG) +handler = logging.StreamHandler() +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +logger.addHandler(handler) + +os.environ["LOG_LEVEL"] = "DEBUG" + + +def load_config(json_file): + with open(json_file, 'r') as file: + config = json.load(file) + return config + +def setup_database(config): + try: + conn = psycopg2.connect( + dbname='postgres', + user=config['database']['username'], + password=config['database']['password'], + host=config['database']['host'] + ) + conn.autocommit = True + cursor = conn.cursor() + # Create the database if it doesn't exist + cursor.execute(sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"), [config['database']['db-name']]) + if not cursor.fetchone(): + cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(config['database']['db-name']))) + conn.close() + + # Connect to the new database to create the extension + conn = psycopg2.connect( + dbname=config['database']['db-name'], + user=config['database']['username'], + password=config['database']['password'], + host=config['database']['host'] + ) + cursor = conn.cursor() + cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;") + cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_buffercache;") + cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_prewarm;") + conn.commit() + conn.close() + except Exception as e: + print(f"Setup failed: {e}") + +def get_stats(config): + with open('queries.json', 'r') as file: + queries = json.load(file) + try: + conn = psycopg2.connect( + dbname=config['db-name'], + user=config['username'], + password=config['password'], + host=config['host'] + ) + cur = conn.cursor() + for item in queries: + query = item['query'] + description = item['description'] + print(f"\nRunning query: {description}") + try: + cur.execute(query) + rows = cur.fetchall() + headers = [desc[0] for desc in cur.description] + print(f"{' | '.join(headers)}") + for row in rows: + print(f"{' | '.join(map(str, row))}") + except Exception as e: + print(f"Failed to run query: {e}") + conn.close() + except Exception as e: + print(f"Setup failed: {e}") + finally: + conn.close() + +def run_pre_warm(config): + print(f"Running pre warm for database:{config['db-name']}") + try: + conn = psycopg2.connect( + dbname=config['db-name'], + user=config['username'], + password=config['password'], + host=config['host'], + ) + cursor = conn.cursor() + cursor.execute("SELECT pg_prewarm('public.pgvector_index') as block_loaded") + conn.commit() + + result = cursor.fetchone() + print(f"Pre-warm blocks loaded: {result[0]}") + conn.close() + except Exception as e: + print(f"Failed to pre-warm the database: {e}") + +def teardown_database(config): + # Optionally drop the database after the test + pass + +def query_configurations(config): + # List of configuration parameters to query + config_queries = [ + "SHOW checkpoint_timeout;", + "SHOW effective_cache_size;", + "SHOW jit;", + "SHOW maintenance_work_mem;", + "SHOW max_parallel_maintenance_workers;", + "SHOW max_parallel_workers;", + "SHOW max_parallel_workers_per_gather;", + "SHOW max_wal_size;", + "SHOW max_worker_processes;", + "SHOW shared_buffers;", + "SHOW wal_compression;", + "SHOW work_mem;" + ] + + try: + conn = psycopg2.connect( + dbname=config['db-name'], + user=config['username'], + password=config['password'], + host=config['host'] + ) + cursor = conn.cursor() + results = [] + + # Execute each query and collect the result + for query in config_queries: + cursor.execute(query) + result = cursor.fetchone() + results.append(result[0] if result else None) + + # Print the raw output to debug + print("Raw query results:", results) + + config_dict = { + "checkpoint_timeout": results[0], + "effective_cache_size": results[1], + "jit": results[2], + "maintenance_work_mem": results[3], + "max_parallel_maintenance_workers": results[4], + "max_parallel_workers": results[5], + "max_parallel_workers_per_gather": results[6], + "max_wal_size": results[7], + "max_worker_processes": results[8], + "shared_buffers": results[9], + "wal_compression": results[10], + "work_mem": results[11] + } + + conn.close() + return config_dict + except Exception as e: + print(f"Failed to query configurations: {e}") + return {} + +def get_base_command(case: dict, db_config: dict) -> list: + base_command = [ + "vectordbbench", case["vdb-command"], + "--user-name", db_config["username"], + "--password", db_config["password"], + "--host", db_config["host"], + "--db-name", db_config["db-name"], + "--case-type", case["case-type"], + "--num-concurrency", case["num-concurrency"], + "--concurrency-duration", str(case["concurrency-duration"]) + ] + + # Handle initial flags (no skip for the first ef_search) + if case.get("drop-old", True): + base_command.append("--drop-old") + else: + base_command.append("--skip-drop-old") + + if case.get("load", True): + base_command.append("--load") + else: + base_command.append("--skip-load") + + if case.get("search-serial", True): + base_command.append("--search-serial") + else: + base_command.append("--skip-search-serial") + + if case.get("search-concurrent", True): + base_command.append("--search-concurrent") + else: + base_command.append("--skip-search-concurrent") + + for key, value in case["index-params"].items(): + base_command.extend([f"--{key}", str(value)]) + + return base_command + +def handle_drop_old_load_flags(command) -> list[str]: + """If --drop-old or --load flags are present, remove them and add skip flags""" + command = [arg for arg in command if arg not in ["--drop-old", "--load"]] + if "--skip-drop-old" not in command: + command.append("--skip-drop-old") + if "--skip-load" not in command: + command.append("--skip-load") + return command + +def get_extension_version(db_config: dict): + try: + conn = psycopg2.connect( + dbname=db_config['db-name'], + user=db_config['username'], + password=db_config['password'], + host=db_config['host'] + ) + cursor = conn.cursor() + cursor.execute("SELECT extname, extversion FROM pg_extension WHERE extname LIKE '%vec%' OR extname LIKE '%ann%' OR extname = 'scann';") + extensions = cursor.fetchall() + conn.close() + extensions = {ext[0]: ext[1] for ext in extensions} + return extensions + except Exception as e: + print(f"Failed to get extension versions: {e}") + return {} + +def get_postgres_version(db_config: dict): + try: + conn = psycopg2.connect( + dbname=db_config['db-name'], + user=db_config['username'], + password=db_config['password'], + host=db_config['host'] + ) + cursor = conn.cursor() + cursor.execute("SELECT version();") + pgversion = cursor.fetchone() + conn.close() + return pgversion[0] + except Exception as e: + print(f"Failed to get extension versions: {e}") + return "" + +def get_output_dir_path( + case: dict, + benchmark_info: dict, + search_params: List[str | int], + run: int, +) -> str: + output_dir = f"results/{case['vector-ext']}/{case['index-type']}/{case['db-label']}/{benchmark_info['provider']}/{benchmark_info['instance-service']}/{benchmark_info['instance-service']}/" + for key, value in case["index-params"].items(): + if key not in ["maintenance-work-mem", "max-parallel-workers"]: + output_dir += f"{value}-" + for val in search_params: + if val.isdigit(): + output_dir += f"{val}-" + output_dir += f"{case['case-type']}-{run}-{int(time.time())}" + return output_dir + +def print_configuration( + case: dict, + benchmark_info: dict, + db_config: dict, + command: list, + output_file, +): + with redirect_stdout(output_file): + logger.info("Benchmark Information:") + for key, value in benchmark_info.items(): + logger.info(f"{key}: {value}") + output_file.flush() + + logger.info("Benchmark Test Run Information:") + for key, value in case.items(): + logger.info(f"{key}: {value}") + output_file.flush() + + logger.info(f"Postgres Database Configuration") + current_configs = query_configurations(db_config) + for key, value in current_configs.items(): + logger.info(f"{key}: {value}") + output_file.flush() + + logger.info(f"Get Buffer Hit Ratio Stats") + get_stats(db_config) + output_file.flush() + + logger.info(f"Running command: {' '.join(command)}") + output_file.flush() + +def generate_combinations(config_dict: dict) -> list: + keys = [] + values = [] + for key, value in config_dict.items(): + keys.append(f"--{key}") + if isinstance(value, list): + values.append(value) + else: + values.append([value]) + + combinations = [] + for combo in product(*values): + combination = [] + for k, v in zip(keys, combo): + combination.append((k, str(v))) + combinations.append([item for pair in combination for item in pair]) + + logger.info(f"Total combinations generated: {len(combinations)}") + return combinations + +def generate_benchmark_metadata( + metadata: dict, + start_time: str, + end_time: str, + output_dir: str, +): + metadata["benchmark-info"]["extension-versions"] = get_extension_version(metadata['database']) + metadata["benchmark-info"]["postgres_version"] = get_postgres_version(metadata['database']) + metadata["benchmark-info"]["start_time"] = start_time + metadata["benchmark-info"]["end_time"] = end_time + del metadata["database"] + + output_filename = f"{output_dir}benchmark_metadata.json" + with open(output_filename, "w") as f: + json.dump(metadata, f, indent=4) + logger.info(f"Benchmark metadata saved to {output_filename}") + +def run_benchmark(case, db_config, benchmark_info, dry_run=False): + base_command = get_base_command(case, db_config) + run_count = case.get("run-count", 1) # Default to 1 if not specified + for run in range(run_count): + print(f"Starting run {run + 1} of {run_count} for case: {case['db-label']}") + for i, search_params in enumerate(generate_combinations(case["search-params"])): + command = base_command + search_params + if i > 0 or run > 0: + command = handle_drop_old_load_flags(command) + + if dry_run: + logger.info(f"Command: {' '.join(command)}") + logger.info(f"Output Dir: {get_output_dir_path(case, benchmark_info, search_params, run)}") + logger.info(f"Extra Information: {get_extension_version(db_config)}", "\n") + else: + try: + output_dir = get_output_dir_path(case, benchmark_info, search_params, run) + os.environ["RESULTS_LOCAL_DIR"] = output_dir + os.makedirs(output_dir, exist_ok=True) + + with open(f"{output_dir}/log.txt", 'w') as f: + print_configuration(case, benchmark_info, db_config, command, f) + run_pre_warm(db_config) + f.flush() + + logger.info("***********START***********") + start_time = time.time() + # Capture both stdout and stderr and write them to the log file + subprocess.run(command, check=True, stdout=f, stderr=f) + end_time = time.time() + execution_time = end_time - start_time + logger.info(f"total_duration={execution_time}") + logger.info("***********END***********") + + with redirect_stdout(f): + get_stats(db_config) + f.flush() + f.flush() + except subprocess.CalledProcessError as e: + logger.error(f"Benchmark Failed: {e}") + logger.info("Sleeping for 1 min") + time.sleep(60) + +def parse_argument(): + parser = argparse.ArgumentParser(description="Run HNSW benchmark") + parser.add_argument("--dry-run", action="store_true", help="Print commands and output directory without executing") + return parser.parse_args() + + +def main(): + args = parse_argument() + config = load_config("config.json") + start_time = time.time() + start_timeh = time.strftime('%Y-%m-%d %H:%M:%S') + benchmark_info = config["benchmark-info"] + logger.info(f"Benchmark run start time: {time.strftime('%Y-%m-%d %H:%M:%S')}") + for case in config['cases']: + print(f"Running case: {case['db-label']}") + setup_database(config) + run_benchmark(case, config['database'], config["benchmark-info"], args.dry_run) + teardown_database(config) + end_timeh = time.strftime('%Y-%m-%d %H:%M:%S') + metadata_output_dir = f"home/emumba/emumbaorg/results/{case['vector-ext']}/{case['index-type']}/{case['db-label']}/{benchmark_info['provider']}/{benchmark_info['instance-service']}/{benchmark_info['instance-service']}/" + generate_benchmark_metadata(config, start_timeh, end_timeh, metadata_output_dir) + + end_time = time.time() + execution_time = end_time - start_time + logger.info(f"Benchmark run end time: {time.strftime('%Y-%m-%d %H:%M:%S')}") + logger.info(f"COMPLETED ALL EXECUTIONS. total_duration={execution_time}") + +if __name__ == "__main__": + main() + From a69305c203a24bcf3673495f889e696bd38e78c6 Mon Sep 17 00:00:00 2001 From: Sheharyar Ahmad Date: Thu, 19 Dec 2024 16:37:20 +0500 Subject: [PATCH 2/7] Updated run and config file: - Added function to generate metadata. - Added function to get extension version. - Added function to get postgres function. --- config.json | 10 ++++------ run.py | 29 ++++++++++++++++++----------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/config.json b/config.json index a4cd7e7c..660626e0 100644 --- a/config.json +++ b/config.json @@ -2,7 +2,7 @@ "database": { "host": "localhost", "username": "postgres", - "password": "admin123", + "password": "postgres", "db-name": "ann" }, "benchmark-info": { @@ -16,7 +16,7 @@ { "db-label": "run1-seqon-new", "vdb-command": "pgvectorhnsw", - "vector-ext": "pgvector", + "vector-ext": "vector", "index-type": "hnsw", "case-type": "Performance1536D500K", "drop-old": true, @@ -30,11 +30,9 @@ "max-parallel-workers": 2 }, "search-params": { - "ef-search": [10, 20], - "query-rescore": [100, 200, 300, 500], - "test": 10 + "ef-search": [10, 20] }, - "num-concurrency": "1,5", + "num-concurrency": "1,10,20,30,40,50,60,70.80,90,100", "concurrency-duration": 30, "k": 10, "run-count": 1 diff --git a/run.py b/run.py index 6a690e64..245fa029 100644 --- a/run.py +++ b/run.py @@ -2,7 +2,7 @@ import json import time import subprocess -from typing import List +from typing import List, Optional import psycopg2 import os import logging @@ -14,7 +14,7 @@ logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() handler.setLevel(logging.DEBUG) -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) @@ -253,17 +253,23 @@ def get_postgres_version(db_config: dict): def get_output_dir_path( case: dict, benchmark_info: dict, - search_params: List[str | int], - run: int, + search_params: Optional[List[str | int]], + run: Optional[int], + db_config: dict, + base_dir: bool = False, ) -> str: - output_dir = f"results/{case['vector-ext']}/{case['index-type']}/{case['db-label']}/{benchmark_info['provider']}/{benchmark_info['instance-service']}/{benchmark_info['instance-service']}/" + ext_version = get_extension_version(db_config) + output_dir = f"results/{case['vector-ext']}-{ext_version[case['vector-ext']]}/{case['index-type']}/{case['db-label']}/{benchmark_info['provider']}/{benchmark_info['instance-service']}/{benchmark_info['instance-service']}/{case['case-type']}/" + if base_dir: + return output_dir + for key, value in case["index-params"].items(): if key not in ["maintenance-work-mem", "max-parallel-workers"]: output_dir += f"{value}-" for val in search_params: if val.isdigit(): output_dir += f"{val}-" - output_dir += f"{case['case-type']}-{run}-{int(time.time())}" + output_dir += f"{run}-{int(time.time())}" return output_dir def print_configuration( @@ -346,11 +352,11 @@ def run_benchmark(case, db_config, benchmark_info, dry_run=False): if dry_run: logger.info(f"Command: {' '.join(command)}") - logger.info(f"Output Dir: {get_output_dir_path(case, benchmark_info, search_params, run)}") - logger.info(f"Extra Information: {get_extension_version(db_config)}", "\n") + logger.info(f"Output Dir: {get_output_dir_path(case, benchmark_info, search_params, run, db_config)}") + logger.info(f"Extra Information: {get_extension_version(db_config)} \n") else: try: - output_dir = get_output_dir_path(case, benchmark_info, search_params, run) + output_dir = get_output_dir_path(case, benchmark_info, search_params, run, db_config) os.environ["RESULTS_LOCAL_DIR"] = output_dir os.makedirs(output_dir, exist_ok=True) @@ -396,8 +402,9 @@ def main(): run_benchmark(case, config['database'], config["benchmark-info"], args.dry_run) teardown_database(config) end_timeh = time.strftime('%Y-%m-%d %H:%M:%S') - metadata_output_dir = f"home/emumba/emumbaorg/results/{case['vector-ext']}/{case['index-type']}/{case['db-label']}/{benchmark_info['provider']}/{benchmark_info['instance-service']}/{benchmark_info['instance-service']}/" - generate_benchmark_metadata(config, start_timeh, end_timeh, metadata_output_dir) + + output_dir = get_output_dir_path(case, benchmark_info, [], 0, db_config=config['database'], base_dir=True) + generate_benchmark_metadata(config, start_timeh, end_timeh, output_dir) end_time = time.time() execution_time = end_time - start_time From 59024beb3edd7823a3d1083c9305d4e6205629a6 Mon Sep 17 00:00:00 2001 From: Sheharyar Ahmad Date: Mon, 23 Dec 2024 19:58:34 +0500 Subject: [PATCH 3/7] Updated config file for testing single run file with different algorithms --- config.json | 71 +++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 61 insertions(+), 10 deletions(-) diff --git a/config.json b/config.json index 660626e0..66e365b5 100644 --- a/config.json +++ b/config.json @@ -7,18 +7,18 @@ }, "benchmark-info": { "name": "ann-benchmark", - "instance-size": "db.m6i.large", - "instance-service": "RDS", - "provider": "aws", - "description": "This is a benchmark for ANN search" + "instance-size": "Standard_D8ds_v5", + "instance-service": "azure-vm", + "provider": "azure", + "description": "This benchmark is done to test the single run file and github workflows" }, "cases": [ { - "db-label": "run1-seqon-new", + "db-label": "hnsw-filtered-search", "vdb-command": "pgvectorhnsw", "vector-ext": "vector", "index-type": "hnsw", - "case-type": "Performance1536D500K", + "case-type": "Performance1536D500K1P", "drop-old": true, "load": true, "search-serial": true, @@ -26,16 +26,67 @@ "index-params": { "m": 16, "ef-construction": 64, - "maintenance-work-mem": "4GB", - "max-parallel-workers": 2 + "maintenance-work-mem": "8GB", + "max-parallel-workers": 7 }, "search-params": { - "ef-search": [10, 20] + "ef-search": [40, 60] }, "num-concurrency": "1,10,20,30,40,50,60,70.80,90,100", "concurrency-duration": 30, "k": 10, - "run-count": 1 + "run-count": 2 + }, + { + "db-label": "hnsw-bq-reranking-filtered-search", + "vdb-command": "pgvectorhnsw", + "vector-ext": "vector", + "index-type": "hnsw-bq", + "drop-old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "index-params": { + "m": 16, + "ef-construction": 64, + "quantization-type": "bit", + "reranking": true, + "maintenance-work-mem": "8GB", + "max-parallel-workers": 7 + }, + "search-params": { + "ef-search": [200], + "quantized-fetch-limit": [200, 100] + }, + "case-type": "Performance1536D500K1P", + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 2 + }, + { + "db-label": "filtered-search", + "vdb-command": "pgdiskann", + "vector-ext": "pgdiskann", + "index-type": "diskann", + "drop-old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "index-params": { + "l-value-ib": 128, + "max-neighbors": 32, + "maintenance-work-mem": "8GB", + "max-parallel-workers": 7 + }, + "search-params": { + "l-value-is": [32, 64] + }, + "case-type": "Performance1536D500K1P", + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run-count": 2 } ] } From 5ae0572c005bb4533921c96ae2a3b3ef4b7684ff Mon Sep 17 00:00:00 2001 From: Sheharyar Ahmad Date: Mon, 23 Dec 2024 19:59:46 +0500 Subject: [PATCH 4/7] Deleted multiple run files. --- run-scripts/queries.json | 46 ------ run-scripts/run-dann.py | 278 ------------------------------- run-scripts/run-hnsw-bq.py | 287 -------------------------------- run-scripts/run-hnsw-churn.py | 288 -------------------------------- run-scripts/run-hnsw.py | 277 ------------------------------- run-scripts/run-pvs-plain.py | 295 --------------------------------- run-scripts/run-pvs-sbq.py | 300 ---------------------------------- 7 files changed, 1771 deletions(-) delete mode 100644 run-scripts/queries.json delete mode 100644 run-scripts/run-dann.py delete mode 100644 run-scripts/run-hnsw-bq.py delete mode 100644 run-scripts/run-hnsw-churn.py delete mode 100644 run-scripts/run-hnsw.py delete mode 100644 run-scripts/run-pvs-plain.py delete mode 100644 run-scripts/run-pvs-sbq.py diff --git a/run-scripts/queries.json b/run-scripts/queries.json deleted file mode 100644 index dad75817..00000000 --- a/run-scripts/queries.json +++ /dev/null @@ -1,46 +0,0 @@ -[ - { - "description": "Postgresql Version", - "query": "SELECT version();" - }, - { - "description": "List of extensions installed on postgersql server", - "query": "SELECT extname AS name, extversion AS version FROM pg_extension ORDER BY extname;" - }, - { - "description": "Hit Ratio, Reads, Writes from pg_stat_io", - "query": "SELECT (hits / (reads + hits)::float) AS hit_ratio, reads, writes FROM pg_stat_io WHERE backend_type = 'client backend' AND context = 'normal' AND object = 'relation';" - }, - { - "description": "Buffer Usage from pg_buffercache", - "query": "WITH state AS (SELECT count(*) FILTER (WHERE relfilenode IS NOT NULL) AS used, count(*) FILTER (WHERE relfilenode IS NULL) AS empty, count(*) AS total FROM pg_buffercache) SELECT *, round(used * 1.0 / total * 100, 1) AS percent FROM state;" - }, - { - "description": "Hit Ratio for Tables from pg_statio_user_tables", - "query": "SELECT SUM(heap_blks_read) as heap_read, SUM(heap_blks_hit) as heap_hit, SUM(heap_blks_hit) / (SUM(heap_blks_hit) + SUM(heap_blks_read)) as hit_ratio FROM pg_statio_user_tables;" - }, - { - "description": "Hit Ratio for Indexes from pg_statio_user_indexes", - "query": "SELECT SUM(idx_blks_read) as idx_read, SUM(idx_blks_hit) as idx_hit, (SUM(idx_blks_hit) - SUM(idx_blks_read)) / SUM(idx_blks_hit) as ratio FROM pg_statio_user_indexes;" - }, - { - "description": "Index Hit Ratio with Table and Index Names", - "query": "SELECT t.schemaname, t.relname as \"Table Name\", io_i.indexrelname as \"Index Name\", CASE WHEN (io_i.idx_blks_hit <> 0 OR io_i.idx_blks_read <> 0) THEN round(io_i.idx_blks_hit / (io_i.idx_blks_hit::numeric + io_i.idx_blks_read::numeric), 4) ELSE null END as \"Index Hit Ratio\" FROM pg_stat_user_tables t JOIN pg_statio_user_indexes io_i ON io_i.relid = t.relid ORDER BY \"Index Hit Ratio\" DESC;" - }, - { - "description": "Buffer Cache Usage (Top 10 Relations)", - "query": "SELECT c.relname, count(*) AS buffers FROM pg_buffercache b INNER JOIN pg_class c ON b.relfilenode = pg_relation_filenode(c.oid) AND b.reldatabase IN (0, (SELECT oid FROM pg_database WHERE datname = current_database())) GROUP BY c.relname ORDER BY 2 DESC LIMIT 10;" - }, - { - "description": "Top 10 Relations Residing in Memory", - "query": "SELECT c.relname, pg_size_pretty(count(*)*8192) AS buffer_size, pg_size_pretty(pg_relation_size(c.oid)) as relation_size, Round(100.0 * Count(*) / (SELECT setting FROM pg_settings WHERE name = 'shared_buffers') :: INTEGER, 2) AS buffers_percent, ROUND(count(*)*8192*100/ pg_relation_size(c.oid)::numeric, 2 ) AS relation_percent, CASE WHEN c.relkind = 'r' THEN 'table' WHEN c.relkind = 'i' THEN 'index' WHEN c.relkind = 'S' THEN 'sequence' WHEN c.relkind = 't' THEN 'TOAST table' WHEN c.relkind = 'v' THEN 'view' WHEN c.relkind = 'm' THEN 'materialized view' WHEN c.relkind = 'c' THEN 'composite type' WHEN c.relkind = 'f' THEN 'foreign table' WHEN c.relkind = 'p' THEN 'partitioned table' WHEN c.relkind = 'I' THEN 'partitioned index' ELSE 'Unexpected relkind' END as relation_type FROM pg_class c INNER JOIN pg_buffercache b ON b.relfilenode = c.relfilenode INNER JOIN pg_database d ON ( b.reldatabase = d.oid AND d.datname = Current_database() ) GROUP BY c.relname, c.oid ORDER BY pg_total_relation_size(c.oid) DESC LIMIT 10;" - }, - { - "description": "How many time the index is used?", - "query": "SELECT relname, 100 * idx_scan / (seq_scan + idx_scan) percent_of_times_index_used, n_live_tup rows_in_table FROM pg_stat_user_tables WHERE seq_scan + idx_scan > 0 ORDER BY n_live_tup DESC;" - }, - { - "description": "Reset Statistics", - "query": "SELECT pg_stat_reset();" - } -] diff --git a/run-scripts/run-dann.py b/run-scripts/run-dann.py deleted file mode 100644 index 3f7325dc..00000000 --- a/run-scripts/run-dann.py +++ /dev/null @@ -1,278 +0,0 @@ -import json -import time -from contextlib import redirect_stdout -import random -import subprocess -import psycopg2 -from psycopg2 import sql -import os - -os.environ["LOG_LEVEL"] = "DEBUG" - -def load_config(json_file): - with open(json_file, 'r') as file: - config = json.load(file) - return config - -def setup_database(config): - try: - conn = psycopg2.connect( - dbname='postgres', - user=config['database']['username'], - password=config['database']['password'], - host=config['database']['host'] - ) - conn.autocommit = True - cursor = conn.cursor() - # Create the database if it doesn't exist - cursor.execute(sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"), [config['database']['db-name']]) - if not cursor.fetchone(): - cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(config['database']['db-name']))) - conn.close() - - # Connect to the new database to create the extension - conn = psycopg2.connect( - dbname=config['database']['db-name'], - user=config['database']['username'], - password=config['database']['password'], - host=config['database']['host'] - ) - cursor = conn.cursor() - cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;") - cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_diskann;") - cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_buffercache;") - cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_prewarm;") - conn.commit() - conn.close() - except Exception as e: - print(f"Setup failed: {e}") - -def get_stats(config): - with open('queries.json', 'r') as file: - queries = json.load(file) - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'] - ) - cur = conn.cursor() - for item in queries: - query = item['query'] - description = item['description'] - print(f"\nRunning query: {description}") - try: - cur.execute(query) - rows = cur.fetchall() - headers = [desc[0] for desc in cur.description] - print(f"{' | '.join(headers)}") - for row in rows: - print(f"{' | '.join(map(str, row))}") - except Exception as e: - print(f"Failed to run query: {e}") - conn.close() - except Exception as e: - print(f"Setup failed: {e}") - finally: - conn.close() - -def pre_warm(config): - print(f"Running pre warm for database:{config['db-name']}") - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'], - ) - cursor = conn.cursor() - cursor.execute("SELECT pg_prewarm('public.pgvector_index') as block_loaded") - conn.commit() - - result = cursor.fetchone() - print(f"Pre-warm blocks loaded: {result[0]}") - conn.close() - except Exception as e: - print(f"Failed to pre-warm the database: {e}") - -def teardown_database(config): - # Optionally drop the database after the test - pass - -def query_configurations(config): - # List of configuration parameters to query - config_queries = [ - "SHOW checkpoint_timeout;", - "SHOW effective_cache_size;", - "SHOW jit;", - "SHOW maintenance_work_mem;", - "SHOW max_parallel_maintenance_workers;", - "SHOW max_parallel_workers;", - "SHOW max_parallel_workers_per_gather;", - "SHOW max_wal_size;", - "SHOW max_worker_processes;", - "SHOW shared_buffers;", - "SHOW wal_compression;", - "SHOW work_mem;" - ] - - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'] - ) - cursor = conn.cursor() - results = [] - - # Execute each query and collect the result - for query in config_queries: - cursor.execute(query) - result = cursor.fetchone() - results.append(result[0] if result else None) - - # Print the raw output to debug - print("Raw query results:", results) - - config_dict = { - "checkpoint_timeout": results[0], - "effective_cache_size": results[1], - "jit": results[2], - "maintenance_work_mem": results[3], - "max_parallel_maintenance_workers": results[4], - "max_parallel_workers": results[5], - "max_parallel_workers_per_gather": results[6], - "max_wal_size": results[7], - "max_worker_processes": results[8], - "shared_buffers": results[9], - "wal_compression": results[10], - "work_mem": results[11] - } - - conn.close() - return config_dict - except Exception as e: - print(f"Failed to query configurations: {e}") - return {} - - -def run_benchmark(case, db_config, benchmark_description): - base_command = [ - "vectordbbench", "pgdiskann", - "--user-name", db_config['username'], - "--password", db_config['password'], - "--host", db_config['host'], - "--db-name", db_config['db-name'] - ] - - # Handle initial flags (no skip for the first ef_search) - if case.get("db-name", True): - base_command.append("--drop-old") - else: - base_command.append("--skip-drop-old") - - if case.get("load", True): - base_command.append("--load") - else: - base_command.append("--skip-load") - - if case.get("search-serial", True): - base_command.append("--search-serial") - else: - base_command.append("--skip-search-serial") - - if case.get("search-concurrent", True): - base_command.append("--search-concurrent") - else: - base_command.append("--skip-search-concurrent") - - base_command.extend([ - "--case-type", case["case-type"], - "--maintenance-work-mem", case["maintenance-work-mem"], - "--max-parallel-workers", str(case["max-parallel-workers"]), - "--l-value-ib", str(case["l-value-ib"]), - "--max-neighbors", str(case["max-neighbors"]), - "--k", str(case["k"]), - "--num-concurrency", case["num-concurrency"], - "--concurrency-duration", str(case["concurrency-duration"]) - ]) - - run_count = case.get("run-count", 1) # Default to 1 if not specified - - for run in range(run_count): - print(f"Starting run {run + 1} of {run_count} for case: {case['db-label']}") - for i, l_value_is in enumerate(case["l-value-is"]): - command = base_command + ["--l-value-is", str(l_value_is)] - - if i > 0 or run > 0: - # Remove conflicting --drop-old and --load flags - command = [arg for arg in command if arg not in ["--drop-old", "--load"]] - # Add skip flags if they are not already in the command - if "--skip-drop-old" not in command: - command.append("--skip-drop-old") - if "--skip-load" not in command: - command.append("--skip-load") - - try: - random_number = random.randint(1, 100000) - print(f"Running command: {' '.join(command)}") - output_dir = f"results/pgdiskann/diskann/{case['db-label']}/{db_config['provider']}/{db_config['instance-type']}-{str(case['max-neighbors'])}-{str(case['l-value-ib'])}-{l_value_is}-{case['case-type']}-{run}-{random_number}" - os.environ["RESULTS_LOCAL_DIR"] = output_dir - - os.makedirs(output_dir, exist_ok=True) - - with open(f"{output_dir}/log.txt", 'w') as f: - with redirect_stdout(f): - print(f"Running benchmark: {benchmark_description}") - print(f"DB Instance Type: {db_config['instance-type']}") - print(f"DB Instance Provider: {db_config['provider']}") - print(f"DB enable-seqscan: {db_config['enable-seqscan']}") - for key, value in case.items(): - if key == "l-value-is": - print(f"{key}: {l_value_is}") - print(f"{key}: {value}") - print("Current PostgreSQL configurations:") - current_configs = query_configurations(db_config) - for key, value in current_configs.items(): - print(f"{key}: {value}") - get_stats(db_config) - f.flush() - pre_warm(db_config) - print(f"Running command: {' '.join(command)}") - f.flush() - - print("***********START***********") - start_time = time.time() - # Capture both stdout and stderr and write them to the log file - subprocess.run(command, check=True, stdout=f, stderr=f) - end_time = time.time() - execution_time = end_time - start_time - print(f"total_duration={execution_time}") - print("***********END***********") - with redirect_stdout(f): - get_stats(db_config) - f.flush() - f.flush() - except subprocess.CalledProcessError as e: - print(f"Benchmark failed: {e}") - print("Sleeping for 30 sec") - time.sleep(60) - -def main(): - config = load_config("config.json") - start_time = time.time() - for case in config['cases']: - print(f"Running case: {case['db-label']}") - setup_database(config) - - run_benchmark(case, config['database'], config['benchmark-description']) - teardown_database(config) - end_time = time.time() - execution_time = end_time - start_time - print(f"COMPLETED ALL EXECUTIONS. total_duration={execution_time}") - -if __name__ == "__main__": - main() - diff --git a/run-scripts/run-hnsw-bq.py b/run-scripts/run-hnsw-bq.py deleted file mode 100644 index d7296c73..00000000 --- a/run-scripts/run-hnsw-bq.py +++ /dev/null @@ -1,287 +0,0 @@ -import json -import time -from contextlib import redirect_stdout -import random -import subprocess -import psycopg2 -from psycopg2 import sql -import os - -os.environ["LOG_LEVEL"] = "DEBUG" - -def load_config(json_file): - with open(json_file, 'r') as file: - config = json.load(file) - return config - -def setup_database(config): - try: - conn = psycopg2.connect( - dbname='postgres', - user=config['database']['username'], - password=config['database']['password'], - host=config['database']['host'] - ) - conn.autocommit = True - cursor = conn.cursor() - # Create the database if it doesn't exist - cursor.execute(sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"), [config['database']['db-name']]) - if not cursor.fetchone(): - cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(config['database']['db-name']))) - conn.close() - - # Connect to the new database to create the extension - conn = psycopg2.connect( - dbname=config['database']['db-name'], - user=config['database']['username'], - password=config['database']['password'], - host=config['database']['host'] - ) - cursor = conn.cursor() - cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;") - cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_buffercache;") - cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_prewarm;") - conn.commit() - conn.close() - except Exception as e: - print(f"Setup failed: {e}") - -def teardown_database(config): - # Optionally drop the database after the test - pass - - -def get_stats(config): - with open('queries.json', 'r') as file: - queries = json.load(file) - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'] - ) - cur = conn.cursor() - for item in queries: - query = item['query'] - description = item['description'] - print(f"\nRunning query: {description}") - try: - cur.execute(query) - rows = cur.fetchall() - headers = [desc[0] for desc in cur.description] - print(f"{' | '.join(headers)}") - for row in rows: - print(f"{' | '.join(map(str, row))}") - except Exception as e: - print(f"Failed to run query: {e}") - conn.close() - except Exception as e: - print(f"Setup failed: {e}") - finally: - conn.close() - - -def query_configurations(config): - # List of configuration parameters to query - config_queries = [ - "SHOW checkpoint_timeout;", - "SHOW effective_cache_size;", - "SHOW jit;", - "SHOW maintenance_work_mem;", - "SHOW max_parallel_maintenance_workers;", - "SHOW max_parallel_workers;", - "SHOW max_parallel_workers_per_gather;", - "SHOW max_wal_size;", - "SHOW max_worker_processes;", - "SHOW shared_buffers;", - "SHOW wal_compression;", - "SHOW work_mem;" - ] - - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'] - ) - cursor = conn.cursor() - results = [] - - # Execute each query and collect the result - for query in config_queries: - cursor.execute(query) - result = cursor.fetchone() - results.append(result[0] if result else None) - - # Print the raw output to debug - print("Raw query results:", results) - - config_dict = { - "checkpoint_timeout": results[0], - "effective_cache_size": results[1], - "jit": results[2], - "maintenance_work_mem": results[3], - "max_parallel_maintenance_workers": results[4], - "max_parallel_workers": results[5], - "max_parallel_workers_per_gather": results[6], - "max_wal_size": results[7], - "max_worker_processes": results[8], - "shared_buffers": results[9], - "wal_compression": results[10], - "work_mem": results[11] - } - - conn.close() - return config_dict - except Exception as e: - print(f"Failed to query configurations: {e}") - return {} - - -def pre_warm(config): - print(f"Running pre warm for database:{config['db-name']}") - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'], - ) - cursor = conn.cursor() - cursor.execute("SELECT pg_prewarm('public.pgvector_index') as block_loaded") - conn.commit() - - result = cursor.fetchone() - print(f"Pre-warm blocks loaded: {result[0]}") - conn.close() - except Exception as e: - print(f"Failed to pre-warm the database: {e}") - -def run_benchmark(case, db_config, benchmark_description): - base_command = [ - "vectordbbench", "pgvectorhnsw", - "--user-name", db_config['username'], - "--password", db_config['password'], - "--host", db_config['host'], - "--db-name", db_config['db-name'] - ] - - # Handle initial flags (no skip for the first ef_search) - if case.get("db-name", True): - base_command.append("--drop-old") - else: - base_command.append("--skip-drop-old") - - if case.get("load", True): - base_command.append("--load") - else: - base_command.append("--skip-load") - - if case.get("search-serial", True): - base_command.append("--search-serial") - else: - base_command.append("--skip-search-serial") - - if case.get("search-concurrent", True): - base_command.append("--search-concurrent") - else: - base_command.append("--skip-search-concurrent") - - if case.get("reranking", True): - base_command.append("--reranking") - else: - base_command.append("--skip-reranking") - - base_command.extend([ - "--case-type", case["case-type"], - "--maintenance-work-mem", case["maintenance-work-mem"], - "--max-parallel-workers", str(case["max-parallel-workers"]), - "--ef-construction", str(case["ef-construction"]), - "--m", str(case["m"]), - "--k", str(case["k"]), - "--num-concurrency", case["num-concurrency"], - "--concurrency-duration", str(case["concurrency-duration"]), - "--quantization-type", str(case["quantization-type"]), - ]) - - run_count = case.get("run-count", 1) # Default to 1 if not specified - - - for run in range(run_count): - print(f"Starting run {run + 1} of {run_count} for case: {case['db-label']}") - for i, ef_search in enumerate(case["ef-search"]): - command = base_command + ["--ef-search", str(ef_search)] - command = command + ["--quantized-fetch-limit", str(ef_search)] - - # Build the index only once. - if i > 0 or run > 0: - # Remove conflicting --drop-old and --load flags - command = [arg for arg in command if arg not in ["--drop-old", "--load"]] - # Add skip flags if they are not already in the command - if "--skip-drop-old" not in command: - command.append("--skip-drop-old") - if "--skip-load" not in command: - command.append("--skip-load") - - try: - random_number = random.randint(1, 100000) - print(f"Running command: {' '.join(command)}") - output_dir = f"results/pgvector/hnsw-bq/{case['db-label']}/{db_config['provider']}/{db_config['instance-type']}-{str(case['m'])}-{str(case['ef-construction'])}-{ef_search}-{case['case-type']}-{run}-{random_number}" - os.environ["RESULTS_LOCAL_DIR"] = output_dir - - os.makedirs(output_dir, exist_ok=True) - - with open(f"{output_dir}/log.txt", 'w') as f: - with redirect_stdout(f): - print(f"DB Instance Type: {db_config['instance-type']}") - print(f"DB Instance Provider: {db_config['provider']}") - print(f"DB enable-seqscan: {db_config['enable-seqscan']}") - for key, value in case.items(): - if key == "ef_search": - print(f"{key}: {ef_search}") - print(f"{key}: {value}") - print("Current PostgreSQL configurations:") - current_configs = query_configurations(db_config) - for key, value in current_configs.items(): - print(f"{key}: {value}") - get_stats(db_config) - f.flush() - pre_warm(db_config) - print(f"Running command: {' '.join(command)}") - f.flush() - - print("***********START***********") - start_time = time.time() - # Capture both stdout and stderr and write them to the log file - subprocess.run(command, check=True, stdout=f, stderr=f) - end_time = time.time() - execution_time = end_time - start_time - print(f"total_duration={execution_time}") - print("***********END***********") - with redirect_stdout(f): - get_stats(db_config) - f.flush() - f.flush() - except subprocess.CalledProcessError as e: - print(f"Benchmark failed: {e}") - print("Sleeping for 1 min") - time.sleep(60) - -def main(): - config = load_config("config.json") - start_time = time.time() - for case in config['cases']: - print(f"Running case: {case['db-label']}") - setup_database(config) - - run_benchmark(case, config['database'], config['benchmark-description']) - teardown_database(config) - end_time = time.time() - execution_time = end_time - start_time - print(f"COMPLETED ALL EXECUTIONS. total_duration={execution_time}") - -if __name__ == "__main__": - main() - diff --git a/run-scripts/run-hnsw-churn.py b/run-scripts/run-hnsw-churn.py deleted file mode 100644 index 736d8748..00000000 --- a/run-scripts/run-hnsw-churn.py +++ /dev/null @@ -1,288 +0,0 @@ -import json -import time -from contextlib import redirect_stdout -import random -import subprocess -import psycopg2 -from psycopg2 import sql -import os - -os.environ["LOG_LEVEL"] = "DEBUG" - -def load_config(json_file): - with open(json_file, 'r') as file: - config = json.load(file) - return config - -def setup_database(config): - try: - conn = psycopg2.connect( - dbname='postgres', - user=config['database']['username'], - password=config['database']['password'], - host=config['database']['host'] - ) - conn.autocommit = True - cursor = conn.cursor() - # Create the database if it doesn't exist - cursor.execute(sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"), [config['database']['db-name']]) - if not cursor.fetchone(): - cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(config['database']['db-name']))) - conn.close() - - # Connect to the new database to create the extension - conn = psycopg2.connect( - dbname=config['database']['db-name'], - user=config['database']['username'], - password=config['database']['password'], - host=config['database']['host'] - ) - cursor = conn.cursor() - cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;") - cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_buffercache;") - cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_prewarm;") - conn.commit() - conn.close() - except Exception as e: - print(f"Setup failed: {e}") - -def teardown_database(config): - # Optionally drop the database after the test - pass - - -def get_stats(config): - current_dir = os.path.dirname(os.path.abspath(__file__)) - queries_file_path = os.path.join(current_dir, "queries.json") - with open(queries_file_path, 'r') as file: - queries = json.load(file) - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'] - ) - cur = conn.cursor() - for item in queries: - query = item['query'] - description = item['description'] - print(f"\nRunning query: {description}") - cur.execute(query) - rows = cur.fetchall() - headers = [desc[0] for desc in cur.description] - print(f"{' | '.join(headers)}") - for row in rows: - print(f"{' | '.join(map(str, row))}") - conn.close() - except Exception as e: - print(f"Setup failed: {e}") - - -def pre_warm(config): - print(f"Running pre warm for database:{config['db-name']}") - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'], - ) - cursor = conn.cursor() - cursor.execute("SELECT pg_prewarm('public.pgvector_index') as block_loaded") - conn.commit() - - result = cursor.fetchone() - print(f"Pre-warm blocks loaded: {result[0]}") - conn.close() - except Exception as e: - print(f"Failed to pre-warm the database: {e}") - -def query_configurations(config): - # List of configuration parameters to query - config_queries = [ - "SHOW checkpoint_timeout;", - "SHOW effective_cache_size;", - "SHOW jit;", - "SHOW maintenance_work_mem;", - "SHOW max_parallel_maintenance_workers;", - "SHOW max_parallel_workers;", - "SHOW max_parallel_workers_per_gather;", - "SHOW max_wal_size;", - "SHOW max_worker_processes;", - "SHOW shared_buffers;", - "SHOW wal_compression;", - "SHOW work_mem;" - ] - - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'] - ) - cursor = conn.cursor() - results = [] - - # Execute each query and collect the result - for query in config_queries: - cursor.execute(query) - result = cursor.fetchone() - results.append(result[0] if result else None) - - # Print the raw output to debug - print("Raw query results:", results) - - config_dict = { - "checkpoint_timeout": results[0], - "effective_cache_size": results[1], - "jit": results[2], - "maintenance_work_mem": results[3], - "max_parallel_maintenance_workers": results[4], - "max_parallel_workers": results[5], - "max_parallel_workers_per_gather": results[6], - "max_wal_size": results[7], - "max_worker_processes": results[8], - "shared_buffers": results[9], - "wal_compression": results[10], - "work_mem": results[11] - } - - conn.close() - return config_dict - except Exception as e: - print(f"Failed to query configurations: {e}") - return {} - - -def run_benchmark(case, db_config, benchmark_description): - base_command = [ - "vectordbbench", "pgvectorhnsw", - "--user-name", db_config['username'], - "--password", db_config['password'], - "--host", db_config['host'], - "--db-name", db_config['db-name'] - ] - - # Handle initial flags (no skip for the first ef_search) - if case.get("db-name", True): - base_command.append("--drop-old") - else: - base_command.append("--skip-drop-old") - - if case.get("load", True): - base_command.append("--load") - else: - base_command.append("--skip-load") - - if case.get("search-serial", True): - base_command.append("--search-serial") - else: - base_command.append("--skip-search-serial") - - if case.get("search-concurrent", True): - base_command.append("--search-concurrent") - else: - base_command.append("--skip-search-concurrent") - - if case.get("search-churn", True): - base_command.append("--search-churn") - else: - base_command.append("--skip-search-churn") - - base_command.extend([ - "--case-type", case["case-type"], - "--maintenance-work-mem", case["maintenance-work-mem"], - "--max-parallel-workers", str(case["max-parallel-workers"]), - "--ef-construction", str(case["ef-construction"]), - "--m", str(case["m"]), - "--k", str(case["k"]), - "--num-concurrency", case["num-concurrency"], - "--concurrency-duration", str(case["concurrency-duration"]), - "--p-churn", str(case["p-churn"]), - ]) - - run_count = case.get("run-count", 1) # Default to 1 if not specified - current_dir = os.path.dirname(os.path.abspath(__file__)) - - for run in range(run_count): - print(f"Starting run {run + 1} of {run_count} for case: {case['db-label']}") - for i, ef_search in enumerate(case["ef-search"]): - command = base_command + ["--ef-search", str(ef_search)] - - if i > 0: - # Remove conflicting --drop-old and --load flags - command = [arg for arg in command if arg not in ["--drop-old", "--load"]] - # Add skip flags if they are not already in the command - if "--skip-drop-old" not in command: - command.append("--skip-drop-old") - if "--skip-load" not in command: - command.append("--skip-load") - - try: - random_number = random.randint(1, 100000) - print(f"Running command: {' '.join(command)}") - output_dir = os.path.join( - current_dir, - f"results/pgvector/hnsw/{case['db-label']}/{db_config['provider']}/{db_config['instance-type']}-{str(case['m'])}-{str(case['ef-construction'])}-{ef_search}-{case['case-type']}-{run}-{random_number}" - ) - os.environ["RESULTS_LOCAL_DIR"] = output_dir - - os.makedirs(output_dir, exist_ok=True) - - with open(f"{output_dir}/log.txt", 'w') as f: - with redirect_stdout(f): - print(f"Running benchmark: {benchmark_description}") - print(f"DB Instance Type: {db_config['instance-type']}") - print(f"DB Instance Provider: {db_config['provider']}") - print(f"DB enable-seqscan: {db_config['enable-seqscan']}") - for key, value in case.items(): - if key == "ef_search": - print(f"{key}: {ef_search}") - print(f"{key}: {value}") - print("Current PostgreSQL configurations:") - current_configs = query_configurations(db_config) - for key, value in current_configs.items(): - print(f"{key}: {value}") - get_stats(db_config) - f.flush() - pre_warm(db_config) - print(f"Running command: {' '.join(command)}") - f.flush() - - print("***********START***********") - start_time = time.time() - # Capture both stdout and stderr and write them to the log file - subprocess.run(command, check=True, stdout=f, stderr=f) - end_time = time.time() - execution_time = end_time - start_time - print(f"total_duration={execution_time}") - print("***********END***********") - with redirect_stdout(f): - get_stats(db_config) - f.flush() - f.flush() - except subprocess.CalledProcessError as e: - print(f"Benchmark failed: {e}") - print("Sleeping for 1 min") - time.sleep(60) - -def main(): - current_dir = os.path.dirname(os.path.abspath(__file__)) - config_path = os.path.join(current_dir, "config.json") - config = load_config(config_path) - start_time = time.time() - for case in config['cases']: - print(f"Running case: {case['db-label']}") - setup_database(config) - - run_benchmark(case, config['database'], config['benchmark-description']) - teardown_database(config) - end_time = time.time() - execution_time = end_time - start_time - print(f"COMPLETED ALL EXECUTIONS. total_duration={execution_time}") - -if __name__ == "__main__": - main() - diff --git a/run-scripts/run-hnsw.py b/run-scripts/run-hnsw.py deleted file mode 100644 index 9242dc38..00000000 --- a/run-scripts/run-hnsw.py +++ /dev/null @@ -1,277 +0,0 @@ -import json -import time -from contextlib import redirect_stdout -import random -import subprocess -import psycopg2 -from psycopg2 import sql -import os - -os.environ["LOG_LEVEL"] = "DEBUG" - -def load_config(json_file): - with open(json_file, 'r') as file: - config = json.load(file) - return config - -def setup_database(config): - try: - conn = psycopg2.connect( - dbname='postgres', - user=config['database']['username'], - password=config['database']['password'], - host=config['database']['host'] - ) - conn.autocommit = True - cursor = conn.cursor() - # Create the database if it doesn't exist - cursor.execute(sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"), [config['database']['db-name']]) - if not cursor.fetchone(): - cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(config['database']['db-name']))) - conn.close() - - # Connect to the new database to create the extension - conn = psycopg2.connect( - dbname=config['database']['db-name'], - user=config['database']['username'], - password=config['database']['password'], - host=config['database']['host'] - ) - cursor = conn.cursor() - cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;") - cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_buffercache;") - cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_prewarm;") - conn.commit() - conn.close() - except Exception as e: - print(f"Setup failed: {e}") - -def get_stats(config): - with open('queries.json', 'r') as file: - queries = json.load(file) - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'] - ) - cur = conn.cursor() - for item in queries: - query = item['query'] - description = item['description'] - print(f"\nRunning query: {description}") - try: - cur.execute(query) - rows = cur.fetchall() - headers = [desc[0] for desc in cur.description] - print(f"{' | '.join(headers)}") - for row in rows: - print(f"{' | '.join(map(str, row))}") - except Exception as e: - print(f"Failed to run query: {e}") - conn.close() - except Exception as e: - print(f"Setup failed: {e}") - finally: - conn.close() - -def pre_warm(config): - print(f"Running pre warm for database:{config['db-name']}") - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'], - ) - cursor = conn.cursor() - cursor.execute("SELECT pg_prewarm('public.pgvector_index') as block_loaded") - conn.commit() - - result = cursor.fetchone() - print(f"Pre-warm blocks loaded: {result[0]}") - conn.close() - except Exception as e: - print(f"Failed to pre-warm the database: {e}") - -def teardown_database(config): - # Optionally drop the database after the test - pass - -def query_configurations(config): - # List of configuration parameters to query - config_queries = [ - "SHOW checkpoint_timeout;", - "SHOW effective_cache_size;", - "SHOW jit;", - "SHOW maintenance_work_mem;", - "SHOW max_parallel_maintenance_workers;", - "SHOW max_parallel_workers;", - "SHOW max_parallel_workers_per_gather;", - "SHOW max_wal_size;", - "SHOW max_worker_processes;", - "SHOW shared_buffers;", - "SHOW wal_compression;", - "SHOW work_mem;" - ] - - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'] - ) - cursor = conn.cursor() - results = [] - - # Execute each query and collect the result - for query in config_queries: - cursor.execute(query) - result = cursor.fetchone() - results.append(result[0] if result else None) - - # Print the raw output to debug - print("Raw query results:", results) - - config_dict = { - "checkpoint_timeout": results[0], - "effective_cache_size": results[1], - "jit": results[2], - "maintenance_work_mem": results[3], - "max_parallel_maintenance_workers": results[4], - "max_parallel_workers": results[5], - "max_parallel_workers_per_gather": results[6], - "max_wal_size": results[7], - "max_worker_processes": results[8], - "shared_buffers": results[9], - "wal_compression": results[10], - "work_mem": results[11] - } - - conn.close() - return config_dict - except Exception as e: - print(f"Failed to query configurations: {e}") - return {} - - -def run_benchmark(case, db_config, benchmark_description): - base_command = [ - "vectordbbench", "pgvectorhnsw", - "--user-name", db_config['username'], - "--password", db_config['password'], - "--host", db_config['host'], - "--db-name", db_config['db-name'] - ] - - # Handle initial flags (no skip for the first ef_search) - if case.get("db-name", True): - base_command.append("--drop-old") - else: - base_command.append("--skip-drop-old") - - if case.get("load", True): - base_command.append("--load") - else: - base_command.append("--skip-load") - - if case.get("search-serial", True): - base_command.append("--search-serial") - else: - base_command.append("--skip-search-serial") - - if case.get("search-concurrent", True): - base_command.append("--search-concurrent") - else: - base_command.append("--skip-search-concurrent") - - base_command.extend([ - "--case-type", case["case-type"], - "--maintenance-work-mem", case["maintenance-work-mem"], - "--max-parallel-workers", str(case["max-parallel-workers"]), - "--ef-construction", str(case["ef-construction"]), - "--m", str(case["m"]), - "--k", str(case["k"]), - "--num-concurrency", case["num-concurrency"], - "--concurrency-duration", str(case["concurrency-duration"]) - ]) - - run_count = case.get("run-count", 1) # Default to 1 if not specified - - for run in range(run_count): - print(f"Starting run {run + 1} of {run_count} for case: {case['db-label']}") - for i, ef_search in enumerate(case["ef-search"]): - command = base_command + ["--ef-search", str(ef_search)] - - if i > 0 or run > 0: - # Remove conflicting --drop-old and --load flags - command = [arg for arg in command if arg not in ["--drop-old", "--load"]] - # Add skip flags if they are not already in the command - if "--skip-drop-old" not in command: - command.append("--skip-drop-old") - if "--skip-load" not in command: - command.append("--skip-load") - - try: - random_number = random.randint(1, 100000) - print(f"Running command: {' '.join(command)}") - output_dir = f"results/pgvector/hnsw/{case['db-label']}/{db_config['provider']}/{db_config['instance-type']}-{str(case['m'])}-{str(case['ef-construction'])}-{ef_search}-{case['case-type']}-{run}-{random_number}" - os.environ["RESULTS_LOCAL_DIR"] = output_dir - - os.makedirs(output_dir, exist_ok=True) - - with open(f"{output_dir}/log.txt", 'w') as f: - with redirect_stdout(f): - print(f"Running benchmark: {benchmark_description}") - print(f"DB Instance Type: {db_config['instance-type']}") - print(f"DB Instance Provider: {db_config['provider']}") - print(f"DB enable-seqscan: {db_config['enable-seqscan']}") - for key, value in case.items(): - if key == "ef_search": - print(f"{key}: {ef_search}") - print(f"{key}: {value}") - print("Current PostgreSQL configurations:") - current_configs = query_configurations(db_config) - for key, value in current_configs.items(): - print(f"{key}: {value}") - get_stats(db_config) - f.flush() - pre_warm(db_config) - print(f"Running command: {' '.join(command)}") - f.flush() - - print("***********START***********") - start_time = time.time() - # Capture both stdout and stderr and write them to the log file - subprocess.run(command, check=True, stdout=f, stderr=f) - end_time = time.time() - execution_time = end_time - start_time - print(f"total_duration={execution_time}") - print("***********END***********") - with redirect_stdout(f): - get_stats(db_config) - f.flush() - f.flush() - except subprocess.CalledProcessError as e: - print(f"Benchmark failed: {e}") - print("Sleeping for 1 min") - time.sleep(60) - -def main(): - config = load_config("config.json") - start_time = time.time() - for case in config['cases']: - print(f"Running case: {case['db-label']}") - setup_database(config) - - run_benchmark(case, config['database'], config["benchmark-description"]) - teardown_database(config) - end_time = time.time() - execution_time = end_time - start_time - print(f"COMPLETED ALL EXECUTIONS. total_duration={execution_time}") - -if __name__ == "__main__": - main() - diff --git a/run-scripts/run-pvs-plain.py b/run-scripts/run-pvs-plain.py deleted file mode 100644 index 3770dd54..00000000 --- a/run-scripts/run-pvs-plain.py +++ /dev/null @@ -1,295 +0,0 @@ -import json -import time -from contextlib import redirect_stdout -import random -import subprocess -import psycopg2 -from psycopg2 import sql -import os - -os.environ["LOG_LEVEL"] = "DEBUG" - -def load_config(json_file): - with open(json_file, 'r') as file: - config = json.load(file) - return config - -def get_stats(config): - with open('queries.json', 'r') as file: - queries = json.load(file) - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'] - ) - cur = conn.cursor() - for item in queries: - query = item['query'] - description = item['description'] - print(f"\nRunning query: {description}") - try: - cur.execute(query) - rows = cur.fetchall() - headers = [desc[0] for desc in cur.description] - print(f"{' | '.join(headers)}") - for row in rows: - print(f"{' | '.join(map(str, row))}") - except Exception as e: - print(f"Failed to run query: {e}") - conn.close() - except Exception as e: - print(f"Setup failed: {e}") - finally: - conn.close() - -def pre_warm(config): - print(f"Running pre warm for database:{config['db-name']}") - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'], - ) - cursor = conn.cursor() - cursor.execute("SELECT pg_prewarm('public.pgvector_index') as block_loaded") - conn.commit() - - result = cursor.fetchone() - print(f"Pre-warm blocks loaded: {result[0]}") - conn.close() - except Exception as e: - print(f"Failed to pre-warm the database: {e}") - -def setup_database(config): - try: - conn = psycopg2.connect( - dbname='postgres', - user=config['database']['username'], - password=config['database']['password'], - host=config['database']['host'] - ) - conn.autocommit = True - cursor = conn.cursor() - # Create the database if it doesn't exist - cursor.execute(sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"), [config['database']['db-name']]) - if not cursor.fetchone(): - cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(config['database']['db-name']))) - conn.close() - - # Connect to the new database to create the extension - conn = psycopg2.connect( - dbname=config['database']['db-name'], - user=config['database']['username'], - password=config['database']['password'], - host=config['database']['host'] - ) - cursor = conn.cursor() - cursor.execute("CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE;") - cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_buffercache;") - cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_prewarm;") - conn.commit() - conn.close() - except Exception as e: - print(f"Setup failed: {e}") - -def teardown_database(config): - # Optionally drop the database after the test - pass - -def query_configurations(config): - # List of configuration parameters to query - config_queries = [ - "SHOW checkpoint_timeout;", - "SHOW effective_cache_size;", - "SHOW jit;", - "SHOW maintenance_work_mem;", - "SHOW max_parallel_maintenance_workers;", - "SHOW max_parallel_workers;", - "SHOW max_parallel_workers_per_gather;", - "SHOW max_wal_size;", - "SHOW max_worker_processes;", - "SHOW shared_buffers;", - "SHOW wal_compression;", - "SHOW work_mem;" - ] - - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'] - ) - cursor = conn.cursor() - results = [] - - # Execute each query and collect the result - for query in config_queries: - cursor.execute(query) - result = cursor.fetchone() - results.append(result[0] if result else None) - - # Print the raw output to debug - print("Raw query results:", results) - - config_dict = { - "checkpoint_timeout": results[0], - "effective_cache_size": results[1], - "jit": results[2], - "maintenance_work_mem": results[3], - "max_parallel_maintenance_workers": results[4], - "max_parallel_workers": results[5], - "max_parallel_workers_per_gather": results[6], - "max_wal_size": results[7], - "max_worker_processes": results[8], - "shared_buffers": results[9], - "wal_compression": results[10], - "work_mem": results[11] - } - - conn.close() - return config_dict - except Exception as e: - print(f"Failed to query configurations: {e}") - return {} - - -def run_benchmark(case, db_config, benchmark_description): - base_command = [ - "vectordbbench", "pgvectorscalediskann", - "--user-name", db_config['username'], - "--password", db_config['password'], - "--host", db_config['host'], - "--db-name", db_config['db-name'] - ] - - # Handle initial flags (no skip for the first ef_search) - if case.get("drop-old", True): - base_command.append("--drop-old") - else: - base_command.append("--skip-drop-old") - - if case.get("load", True): - base_command.append("--load") - else: - base_command.append("--skip-load") - - if case.get("search-serial", True): - base_command.append("--search-serial") - else: - base_command.append("--skip-search-serial") - - if case.get("search-concurrent", True): - base_command.append("--search-concurrent") - else: - base_command.append("--skip-search-concurrent") - - base_command.extend([ - "--case-type", case["case-type"], - "--maintenance-work-mem", case["maintenance-work-mem"], - "--max-parallel-workers", str(case["max-parallel-workers"]), - "--k", str(case["k"]), - "--num-concurrency", case["num-concurrency"], - "--concurrency-duration", str(case["concurrency-duration"]) - ]) - - # Add pgvectorscale-specific parameters (excluding query-time parameters here for iteration) - base_command.extend([ - "--storage-layout", case["storage-layout"], - "--num-neighbors", str(case["num-neighbors"]), - "--search-list-size", str(case["search-list-size"]), - "--max-alpha", str(case["max-alpha"]), - "--num-dimensions", str(case["num-dimensions"]) - ]) - - run_count = case.get("run-count", 1) # Default to 1 if not specified - - for run in range(run_count): - print(f"Starting run {run + 1} of {run_count} for case: {case['db-label']}") - for i, query_search_list_size in enumerate(case["query-search-list-size"]): - command = base_command + [ - "--query-search-list-size", str(query_search_list_size), - ] - - if i > 0 or run > 0: - # Remove conflicting --drop-old and --load flags - command = [arg for arg in command if arg not in ["--drop-old", "--load"]] - # Add skip flags if they are not already in the command - if "--skip-drop-old" not in command: - command.append("--skip-drop-old") - if "--skip-load" not in command: - command.append("--skip-load") - - try: - random_number = random.randint(1, 100000) - print(f"Running command: {' '.join(command)}") - output_dir = ( - f"results/pgvectorscale/{case['db-label']}/{db_config['provider']}/" - f"{db_config['instance-type']}-" - f"{case['storage-layout']}-" - f"{case['num-neighbors']}-" - f"{case['search-list-size']}-" - f"{case['max-alpha']}-" - f"{query_search_list_size}-" - f"{case['case-type']}-{run}-{random_number}" - ) - os.environ["RESULTS_LOCAL_DIR"] = output_dir - - os.makedirs(output_dir, exist_ok=True) - - with open(f"{output_dir}/log.txt", 'w') as f: - with redirect_stdout(f): - print(f"Running benchmark: {benchmark_description}") - print(f"DB Instance Type: {db_config['instance-type']}") - print(f"DB Instance Provider: {db_config['provider']}") - print(f"DB enable-seqscan: {db_config['enable-seqscan']}") - for key, value in case.items(): - if key == "query-search-list-size": - print(f"{key}: {query_search_list_size}") - print(f"{key}: {value}") - print("Current PostgreSQL configurations:") - current_configs = query_configurations(db_config) - for key, value in current_configs.items(): - print(f"{key}: {value}") - get_stats(db_config) - f.flush() - pre_warm(db_config) - print(f"Running command: {' '.join(command)}") - f.flush() - - print("***********START***********") - start_time = time.time() - # Capture both stdout and stderr and write them to the log file - subprocess.run(command, check=True, stdout=f, stderr=f) - end_time = time.time() - execution_time = end_time - start_time - print(f"total_duration={execution_time}") - print("***********END***********") - with redirect_stdout(f): - get_stats(db_config) - f.flush() - f.flush() - except subprocess.CalledProcessError as e: - print(f"Benchmark failed: {e}") - print("Sleeping for 30 sec") - time.sleep(60) - -def main(): - config = load_config("config.json") - start_time = time.time() - for case in config['cases']: - print(f"Running case: {case['db-label']}") - setup_database(config) - - run_benchmark(case, config['database'], config['benchmark-description']) - teardown_database(config) - end_time = time.time() - execution_time = end_time - start_time - print(f"COMPLETED ALL EXECUTIONS. total_duration={execution_time}") - -if __name__ == "__main__": - main() - diff --git a/run-scripts/run-pvs-sbq.py b/run-scripts/run-pvs-sbq.py deleted file mode 100644 index 9da203d2..00000000 --- a/run-scripts/run-pvs-sbq.py +++ /dev/null @@ -1,300 +0,0 @@ -import json -import time -from contextlib import redirect_stdout -import random -import subprocess -import psycopg2 -from psycopg2 import sql -import os - -os.environ["LOG_LEVEL"] = "DEBUG" - -def load_config(json_file): - with open(json_file, 'r') as file: - config = json.load(file) - return config - -def get_stats(config): - with open('queries.json', 'r') as file: - queries = json.load(file) - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'] - ) - cur = conn.cursor() - for item in queries: - query = item['query'] - description = item['description'] - print(f"\nRunning query: {description}") - try: - cur.execute(query) - rows = cur.fetchall() - headers = [desc[0] for desc in cur.description] - print(f"{' | '.join(headers)}") - for row in rows: - print(f"{' | '.join(map(str, row))}") - except Exception as e: - print(f"Failed to run query: {e}") - conn.close() - except Exception as e: - print(f"Setup failed: {e}") - finally: - conn.close() - -def pre_warm(config): - print(f"Running pre warm for database:{config['db-name']}") - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'], - ) - cursor = conn.cursor() - cursor.execute("SELECT pg_prewarm('public.pgvector_index') as block_loaded") - conn.commit() - - result = cursor.fetchone() - print(f"Pre-warm blocks loaded: {result[0]}") - conn.close() - except Exception as e: - print(f"Failed to pre-warm the database: {e}") - -def setup_database(config): - try: - conn = psycopg2.connect( - dbname='postgres', - user=config['database']['username'], - password=config['database']['password'], - host=config['database']['host'] - ) - conn.autocommit = True - cursor = conn.cursor() - # Create the database if it doesn't exist - cursor.execute(sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"), [config['database']['db-name']]) - if not cursor.fetchone(): - cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(config['database']['db-name']))) - conn.close() - - # Connect to the new database to create the extension - conn = psycopg2.connect( - dbname=config['database']['db-name'], - user=config['database']['username'], - password=config['database']['password'], - host=config['database']['host'] - ) - cursor = conn.cursor() - cursor.execute("CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE;") - cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_buffercache;") - cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_prewarm;") - conn.commit() - conn.close() - except Exception as e: - print(f"Setup failed: {e}") - -def teardown_database(config): - # Optionally drop the database after the test - pass - -def query_configurations(config): - # List of configuration parameters to query - config_queries = [ - "SHOW checkpoint_timeout;", - "SHOW effective_cache_size;", - "SHOW jit;", - "SHOW maintenance_work_mem;", - "SHOW max_parallel_maintenance_workers;", - "SHOW max_parallel_workers;", - "SHOW max_parallel_workers_per_gather;", - "SHOW max_wal_size;", - "SHOW max_worker_processes;", - "SHOW shared_buffers;", - "SHOW wal_compression;", - "SHOW work_mem;" - ] - - try: - conn = psycopg2.connect( - dbname=config['db-name'], - user=config['username'], - password=config['password'], - host=config['host'] - ) - cursor = conn.cursor() - results = [] - - # Execute each query and collect the result - for query in config_queries: - cursor.execute(query) - result = cursor.fetchone() - results.append(result[0] if result else None) - - # Print the raw output to debug - print("Raw query results:", results) - - config_dict = { - "checkpoint_timeout": results[0], - "effective_cache_size": results[1], - "jit": results[2], - "maintenance_work_mem": results[3], - "max_parallel_maintenance_workers": results[4], - "max_parallel_workers": results[5], - "max_parallel_workers_per_gather": results[6], - "max_wal_size": results[7], - "max_worker_processes": results[8], - "shared_buffers": results[9], - "wal_compression": results[10], - "work_mem": results[11] - } - - conn.close() - return config_dict - except Exception as e: - print(f"Failed to query configurations: {e}") - return {} - - -def run_benchmark(case, db_config, benchmark_description): - base_command = [ - "vectordbbench", "pgvectorscale", - "--user-name", db_config['username'], - "--password", db_config['password'], - "--host", db_config['host'], - "--db-name", db_config['db-name'] - ] - - # Handle initial flags (no skip for the first ef_search) - if case.get("db-name", True): - base_command.append("--drop-old") - else: - base_command.append("--skip-drop-old") - - if case.get("load", True): - base_command.append("--load") - else: - base_command.append("--skip-load") - - if case.get("search-serial", True): - base_command.append("--search-serial") - else: - base_command.append("--skip-search-serial") - - if case.get("search-concurrent", True): - base_command.append("--search-concurrent") - else: - base_command.append("--skip-search-concurrent") - - base_command.extend([ - "--case-type", case["case-type"], - "--maintenance-work-mem", case["maintenance-work-mem"], - "--max-parallel-workers", str(case["max-parallel-workers"]), - "--k", str(case["k"]), - "--num-concurrency", case["num-concurrency"], - "--concurrency-duration", str(case["concurrency-duration"]) - ]) - - # Add pgvectorscale-specific parameters (excluding query-time parameters here for iteration) - base_command.extend([ - "--storage-layout", case["storage-layout"], - "--num-neighbors", str(case["num-neighbors"]), - "--search-list-size", str(case["search-list-size"]), - "--max-alpha", str(case["max-alpha"]), - "--num-dimensions", str(case["num-dimensions"]) - ]) - - run_count = case.get("run-count", 1) # Default to 1 if not specified - - for run in range(run_count): - print(f"Starting run {run + 1} of {run_count} for case: {case['db-label']}") - for i, query_search_list_size in enumerate(case["query-search-list-size"]): - for j, query_rescore in enumerate(case["query-rescore"]): - command = base_command + [ - "--query-search-list-size", str(query_search_list_size), - "--query-rescore", str(query_rescore) - ] - - if i > 0 or j > 0 or run > 0: - # Remove conflicting --drop-old and --load flags - command = [arg for arg in command if arg not in ["--drop-old", "--load"]] - # Add skip flags if they are not already in the command - if "--skip-drop-old" not in command: - command.append("--skip-drop-old") - if "--skip-load" not in command: - command.append("--skip-load") - - try: - random_number = random.randint(1, 100000) - print(f"Running command: {' '.join(command)}") - output_dir = ( - f"results/pgvectorscale/{case['db-label']}/{db_config['provider']}/" - f"{db_config['instance-type']}-" - f"{case['storage-layout']}-" - f"{case['num-neighbors']}-" - f"{case['search-list-size']}-" - f"{case['max-alpha']}-" - f"{query_search_list_size}-" - f"{query_rescore}-" - f"{case['case-type']}-{run}-{random_number}" - ) - os.environ["RESULTS_LOCAL_DIR"] = output_dir - - os.makedirs(output_dir, exist_ok=True) - - with open(f"{output_dir}/log.txt", 'w') as f: - with redirect_stdout(f): - print(f"Running benchmark: {benchmark_description}") - print(f"DB Instance Type: {db_config['instance-type']}") - print(f"DB Instance Provider: {db_config['provider']}") - print(f"DB enable-seqscan: {db_config['enable-seqscan']}") - for key, value in case.items(): - if key == "query-search-list-size": - print(f"{key}: {query_search_list_size}") - if key == "query-rescore": - print(f"{key}: {query_rescore}") - print(f"{key}: {value}") - print("Current PostgreSQL configurations:") - current_configs = query_configurations(db_config) - for key, value in current_configs.items(): - print(f"{key}: {value}") - get_stats(db_config) - f.flush() - pre_warm(db_config) - print(f"Running command: {' '.join(command)}") - f.flush() - - print("***********START***********") - start_time = time.time() - # Capture both stdout and stderr and write them to the log file - subprocess.run(command, check=True, stdout=f, stderr=f) - end_time = time.time() - execution_time = end_time - start_time - print(f"total_duration={execution_time}") - print("***********END***********") - with redirect_stdout(f): - get_stats(db_config) - f.flush() - f.flush() - except subprocess.CalledProcessError as e: - print(f"Benchmark failed: {e}") - print("Sleeping for 30 sec") - time.sleep(60) - -def main(): - config = load_config("config.json") - start_time = time.time() - for case in config['cases']: - print(f"Running case: {case['db-label']}") - setup_database(config) - - run_benchmark(case, config['database'], config['benchmark-description']) - teardown_database(config) - end_time = time.time() - execution_time = end_time - start_time - print(f"COMPLETED ALL EXECUTIONS. total_duration={execution_time}") - -if __name__ == "__main__": - main() - From 38a1365e536249b2ebe4c566b5b0392c847959b8 Mon Sep 17 00:00:00 2001 From: Sheharyar Ahmad Date: Mon, 23 Dec 2024 23:08:03 +0500 Subject: [PATCH 5/7] Added pgvectorscale config in config.json --- config.json | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/config.json b/config.json index 66e365b5..80835415 100644 --- a/config.json +++ b/config.json @@ -65,7 +65,7 @@ "run_count": 2 }, { - "db-label": "filtered-search", + "db-label": "diskann-filtered-search", "vdb-command": "pgdiskann", "vector-ext": "pgdiskann", "index-type": "diskann", @@ -87,6 +87,33 @@ "concurrency-duration": 30, "k": 10, "run-count": 2 + }, + { + "db-label": "pgvectorscale-filtered-search", + "vdb-command": "pgvectorscalediskann", + "vector-ext": "pgvectorscale", + "index-type": "diskann", + "drop-old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "index-params": { + "num-neighbors": 32, + "max-alpha": 1.2, + "num-dimensions": 0, + "storage-layout": "plain", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 7 + }, + "search-params": { + "search-list-size": [64], + "query-search-list-size": [64] + }, + "case-type": "Performance1536D500K1P", + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run-count": 2 } ] } From 7f442f9680fe212accf97e21fa59b9a7a9c0ab52 Mon Sep 17 00:00:00 2001 From: Sheharyar Ahmad Date: Mon, 23 Dec 2024 23:30:42 +0500 Subject: [PATCH 6/7] Fixed error in num-concurrency value --- config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.json b/config.json index 80835415..f77fd8d1 100644 --- a/config.json +++ b/config.json @@ -32,7 +32,7 @@ "search-params": { "ef-search": [40, 60] }, - "num-concurrency": "1,10,20,30,40,50,60,70.80,90,100", + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", "concurrency-duration": 30, "k": 10, "run-count": 2 From 78ccea03c4cdac0a18b6b53304ea1c26eca67909 Mon Sep 17 00:00:00 2001 From: Sheharyar Ahmad Date: Tue, 24 Dec 2024 16:32:59 +0500 Subject: [PATCH 7/7] Added create extension command for pg_diskann and vectorscale. Added condition to generate metadata when doing dry run. --- run.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/run.py b/run.py index 245fa029..4ffb8d81 100644 --- a/run.py +++ b/run.py @@ -50,9 +50,19 @@ def setup_database(config): host=config['database']['host'] ) cursor = conn.cursor() - cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;") cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_buffercache;") cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_prewarm;") + cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;") + try: + cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_diskann;") + except Exception as e: + logger.error(f"Installing pgdiskann extension failed: {e}") + + try: + cursor.execute("CREATE EXTENSION IF NOT EXISTS vectorscale;") + except Exception as e: + logger.error(f"Installing vectorscale extension failed: {e}") + conn.commit() conn.close() except Exception as e: @@ -404,7 +414,8 @@ def main(): end_timeh = time.strftime('%Y-%m-%d %H:%M:%S') output_dir = get_output_dir_path(case, benchmark_info, [], 0, db_config=config['database'], base_dir=True) - generate_benchmark_metadata(config, start_timeh, end_timeh, output_dir) + if not args.dry_run: + generate_benchmark_metadata(config, start_timeh, end_timeh, output_dir) end_time = time.time() execution_time = end_time - start_time