From 4bfce62528469ca3fb807cd26b3d28cb7b122d1b Mon Sep 17 00:00:00 2001 From: Sheharyar Ahmad Date: Mon, 25 Nov 2024 22:16:17 +0500 Subject: [PATCH 1/3] Added AlloyDB client --- README.md | 3 +- pyproject.toml | 1 + vectordb_bench/backend/clients/__init__.py | 13 + .../backend/clients/alloydb/alloydb.py | 377 ++++++++++++++++++ vectordb_bench/backend/clients/alloydb/cli.py | 147 +++++++ .../backend/clients/alloydb/config.py | 168 ++++++++ vectordb_bench/backend/clients/api.py | 2 + vectordb_bench/cli/vectordbbench.py | 2 + .../frontend/config/dbCaseConfigs.py | 166 ++++++++ vectordb_bench/models.py | 9 + 10 files changed, 887 insertions(+), 1 deletion(-) create mode 100644 vectordb_bench/backend/clients/alloydb/alloydb.py create mode 100644 vectordb_bench/backend/clients/alloydb/cli.py create mode 100644 vectordb_bench/backend/clients/alloydb/config.py diff --git a/README.md b/README.md index bc550a7f2..9a109146e 100644 --- a/README.md +++ b/README.md @@ -38,11 +38,12 @@ All the database client supported | pgvector | `pip install vectordb-bench[pgvector]` | | pgvecto.rs | `pip install vectordb-bench[pgvecto_rs]` | | pgvectorscale | `pip install vectordb-bench[pgvectorscale]` | -| pgdiskann | `pip install vectordb-bench[pgdiskann]` | +| pgdiskann | `pip install vectordb-bench[pgdiskann]` | | redis | `pip install vectordb-bench[redis]` | | memorydb | `pip install vectordb-bench[memorydb]` | | chromadb | `pip install vectordb-bench[chromadb]` | | awsopensearch | `pip install vectordb-bench[awsopensearch]` | +| alloydb | `pip install vectordb-bench[alloydb]` | ### Run diff --git a/pyproject.toml b/pyproject.toml index 000800389..760404b1d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,6 +73,7 @@ elastic = [ "elasticsearch" ] pgvector = [ "psycopg", "psycopg-binary", "pgvector" ] pgvectorscale = [ "psycopg", "psycopg-binary", "pgvector" ] pgdiskann = [ "psycopg", "psycopg-binary", "pgvector" ] +alloydb = [ "psycopg", "psycopg-binary", "pgvector"] pgvecto_rs = [ "pgvecto_rs[psycopg3]>=0.2.2" ] redis = [ "redis" ] memorydb = [ "memorydb" ] diff --git a/vectordb_bench/backend/clients/__init__.py b/vectordb_bench/backend/clients/__init__.py index c26aa3d6d..8859381b5 100644 --- a/vectordb_bench/backend/clients/__init__.py +++ b/vectordb_bench/backend/clients/__init__.py @@ -32,6 +32,7 @@ class DB(Enum): PgVectoRS = "PgVectoRS" PgVectorScale = "PgVectorScale" PgDiskANN = "PgDiskANN" + AlloyDB = "AlloyDB" Redis = "Redis" MemoryDB = "MemoryDB" Chroma = "Chroma" @@ -97,6 +98,10 @@ def init_cls(self) -> Type[VectorDB]: if self == DB.AWSOpenSearch: from .aws_opensearch.aws_opensearch import AWSOpenSearch return AWSOpenSearch + + if self == DB.AlloyDB: + from .alloydb.alloydb import AlloyDB + return AlloyDB @property def config_cls(self) -> Type[DBConfig]: @@ -156,6 +161,10 @@ def config_cls(self) -> Type[DBConfig]: if self == DB.AWSOpenSearch: from .aws_opensearch.config import AWSOpenSearchConfig return AWSOpenSearchConfig + + if self == DB.AlloyDB: + from .alloydb.config import AlloyDBConfig + return AlloyDBConfig def case_config_cls(self, index_type: IndexType | None = None) -> Type[DBCaseConfig]: if self == DB.Milvus: @@ -197,6 +206,10 @@ def case_config_cls(self, index_type: IndexType | None = None) -> Type[DBCaseCon if self == DB.PgDiskANN: from .pgdiskann.config import _pgdiskann_case_config return _pgdiskann_case_config.get(index_type) + + if self == DB.AlloyDB: + from .alloydb.config import _alloydb_case_config + return _alloydb_case_config.get(index_type) # DB.Pinecone, DB.Chroma, DB.Redis return EmptyDBCaseConfig diff --git a/vectordb_bench/backend/clients/alloydb/alloydb.py b/vectordb_bench/backend/clients/alloydb/alloydb.py new file mode 100644 index 000000000..c053c2fad --- /dev/null +++ b/vectordb_bench/backend/clients/alloydb/alloydb.py @@ -0,0 +1,377 @@ +"""Wrapper around the alloydb vector database over VectorDB""" + +import logging +import pprint +from contextlib import contextmanager +from typing import Any, Generator, Optional, Tuple, Sequence + +import numpy as np +import psycopg +from pgvector.psycopg import register_vector +from psycopg import Connection, Cursor, sql + +from ..api import VectorDB +from .config import AlloyDBConfigDict, AlloyDBIndexConfig, AlloyDBScaNNConfig + +log = logging.getLogger(__name__) + + +class AlloyDB(VectorDB): + """Use psycopg instructions""" + + conn: psycopg.Connection[Any] | None = None + cursor: psycopg.Cursor[Any] | None = None + + _filtered_search: sql.Composed + _unfiltered_search: sql.Composed + + def __init__( + self, + dim: int, + db_config: AlloyDBConfigDict, + db_case_config: AlloyDBIndexConfig, + collection_name: str = "alloydb_collection", + drop_old: bool = False, + **kwargs, + ): + self.name = "AlloyDB" + self.db_config = db_config + self.case_config = db_case_config + self.table_name = collection_name + self.dim = dim + + self._index_name = "alloydb_index" + self._primary_field = "id" + self._vector_field = "embedding" + + # construct basic units + self.conn, self.cursor = self._create_connection(**self.db_config) + + # create vector extension + self.cursor.execute("CREATE EXTENSION IF NOT EXISTS alloydb_scann CASCADE") + self.conn.commit() + + log.info(f"{self.name} config values: {self.db_config}\n{self.case_config}") + if not any( + ( + self.case_config.create_index_before_load, + self.case_config.create_index_after_load, + ) + ): + err = f"{self.name} config must create an index using create_index_before_load or create_index_after_load" + log.error(err) + raise RuntimeError( + f"{err}\n{pprint.pformat(self.db_config)}\n{pprint.pformat(self.case_config)}" + ) + + if drop_old: + self._drop_index() + self._drop_table() + self._create_table(dim) + if self.case_config.create_index_before_load: + self._create_index() + + self.cursor.close() + self.conn.close() + self.cursor = None + self.conn = None + + @staticmethod + def _create_connection(**kwargs) -> Tuple[Connection, Cursor]: + conn = psycopg.connect(**kwargs) + register_vector(conn) + conn.autocommit = False + cursor = conn.cursor() + + assert conn is not None, "Connection is not initialized" + assert cursor is not None, "Cursor is not initialized" + return conn, cursor + + def _generate_search_query(self, filtered: bool=False) -> sql.Composed: + search_query = sql.Composed( + [ + sql.SQL( + "SELECT id FROM public.{table_name} {where_clause} ORDER BY embedding " + ).format( + table_name=sql.Identifier(self.table_name), + where_clause=sql.SQL("WHERE id >= %s") if filtered else sql.SQL(""), + ), + sql.SQL(self.case_config.search_param()["metric_fun_op"]), + sql.SQL(" %s::vector LIMIT %s::int"), + ] + ) + return search_query + + @contextmanager + def init(self) -> Generator[None, None, None]: + """ + Examples: + >>> with self.init(): + >>> self.insert_embeddings() + >>> self.search_embedding() + """ + + self.conn, self.cursor = self._create_connection(**self.db_config) + + # index configuration may have commands defined that we should set during each client session + session_options: Sequence[dict[str, Any]] = self.case_config.session_param()["session_options"] + + if len(session_options) > 0: + for setting in session_options: + command = sql.SQL("SET {setting_name} " + "= {val};").format( + setting_name=sql.Identifier(setting['parameter']['setting_name']), + val=sql.Identifier(str(setting['parameter']['val'])), + ) + log.debug(command.as_string(self.cursor)) + self.cursor.execute(command) + self.conn.commit() + + self._filtered_search = self._generate_search_query(filtered=True) + self._unfiltered_search = self._generate_search_query() + + try: + yield + finally: + self.cursor.close() + self.conn.close() + self.cursor = None + self.conn = None + + def _drop_table(self): + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client drop table : {self.table_name}") + + self.cursor.execute( + sql.SQL("DROP TABLE IF EXISTS public.{table_name}").format( + table_name=sql.Identifier(self.table_name) + ) + ) + self.conn.commit() + + def ready_to_load(self): + pass + + def optimize(self): + self._post_insert() + + def _post_insert(self): + log.info(f"{self.name} post insert before optimize") + if self.case_config.create_index_after_load: + self._drop_index() + self._create_index() + + def _drop_index(self): + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client drop index : {self._index_name}") + + drop_index_sql = sql.SQL("DROP INDEX IF EXISTS {index_name}").format( + index_name=sql.Identifier(self._index_name) + ) + log.debug(drop_index_sql.as_string(self.cursor)) + self.cursor.execute(drop_index_sql) + self.conn.commit() + + def _set_parallel_index_build_param(self): + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + + index_param = self.case_config.index_param() + + if index_param["enable_pca"] is not None: + self.cursor.execute( + sql.SQL("SET scann.enable_pca TO {};").format( + index_param["enable_pca"] + ) + ) + self.cursor.execute( + sql.SQL("ALTER USER {} SET scann.enable_pca TO {};").format( + sql.Identifier(self.db_config["user"]), + index_param["enable_pca"], + ) + ) + self.conn.commit() + + if index_param["maintenance_work_mem"] is not None: + self.cursor.execute( + sql.SQL("SET maintenance_work_mem TO {};").format( + index_param["maintenance_work_mem"] + ) + ) + self.cursor.execute( + sql.SQL("ALTER USER {} SET maintenance_work_mem TO {};").format( + sql.Identifier(self.db_config["user"]), + index_param["maintenance_work_mem"], + ) + ) + self.conn.commit() + + if index_param["max_parallel_workers"] is not None: + self.cursor.execute( + sql.SQL("SET max_parallel_maintenance_workers TO '{}';").format( + index_param["max_parallel_workers"] + ) + ) + self.cursor.execute( + sql.SQL( + "ALTER USER {} SET max_parallel_maintenance_workers TO '{}';" + ).format( + sql.Identifier(self.db_config["user"]), + index_param["max_parallel_workers"], + ) + ) + self.cursor.execute( + sql.SQL("SET max_parallel_workers TO '{}';").format( + index_param["max_parallel_workers"] + ) + ) + self.cursor.execute( + sql.SQL( + "ALTER USER {} SET max_parallel_workers TO '{}';" + ).format( + sql.Identifier(self.db_config["user"]), + index_param["max_parallel_workers"], + ) + ) + self.cursor.execute( + sql.SQL( + "ALTER TABLE {} SET (parallel_workers = {});" + ).format( + sql.Identifier(self.table_name), + index_param["max_parallel_workers"], + ) + ) + self.conn.commit() + + results = self.cursor.execute( + sql.SQL("SHOW max_parallel_maintenance_workers;") + ).fetchall() + results.extend( + self.cursor.execute(sql.SQL("SHOW max_parallel_workers;")).fetchall() + ) + results.extend( + self.cursor.execute(sql.SQL("SHOW maintenance_work_mem;")).fetchall() + ) + log.info(f"{self.name} parallel index creation parameters: {results}") + + def _create_index(self): + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client create index : {self._index_name}") + + index_param = self.case_config.index_param() + self._set_parallel_index_build_param() + options = [] + for option in index_param["index_creation_with_options"]: + if option['val'] is not None: + options.append( + sql.SQL("{option_name} = {val}").format( + option_name=sql.Identifier(option['option_name']), + val=sql.Identifier(str(option['val'])), + ) + ) + if any(options): + with_clause = sql.SQL("WITH ({});").format(sql.SQL(", ").join(options)) + else: + with_clause = sql.Composed(()) + + index_create_sql = sql.SQL( + """ + CREATE INDEX IF NOT EXISTS {index_name} ON public.{table_name} + USING {index_type} (embedding {embedding_metric}) + """ + ).format( + index_name=sql.Identifier(self._index_name), + table_name=sql.Identifier(self.table_name), + index_type=sql.Identifier(index_param["index_type"]), + embedding_metric=sql.Identifier(index_param["metric"]), + ) + + index_create_sql_with_with_clause = ( + index_create_sql + with_clause + ).join(" ") + log.debug(index_create_sql_with_with_clause.as_string(self.cursor)) + self.cursor.execute(index_create_sql_with_with_clause) + self.conn.commit() + + def _create_table(self, dim: int): + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + + try: + log.info(f"{self.name} client create table : {self.table_name}") + + # create table + self.cursor.execute( + sql.SQL( + "CREATE TABLE IF NOT EXISTS public.{table_name} (id BIGINT PRIMARY KEY, embedding vector({dim}));" + ).format(table_name=sql.Identifier(self.table_name), dim=dim) + ) + self.cursor.execute( + sql.SQL( + "ALTER TABLE public.{table_name} ALTER COLUMN embedding SET STORAGE PLAIN;" + ).format(table_name=sql.Identifier(self.table_name)) + ) + self.conn.commit() + except Exception as e: + log.warning( + f"Failed to create alloydb table: {self.table_name} error: {e}" + ) + raise e from None + + def insert_embeddings( + self, + embeddings: list[list[float]], + metadata: list[int], + **kwargs: Any, + ) -> Tuple[int, Optional[Exception]]: + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + + try: + metadata_arr = np.array(metadata) + embeddings_arr = np.array(embeddings) + + with self.cursor.copy( + sql.SQL("COPY public.{table_name} FROM STDIN (FORMAT BINARY)").format( + table_name=sql.Identifier(self.table_name) + ) + ) as copy: + copy.set_types(["bigint", "vector"]) + for i, row in enumerate(metadata_arr): + copy.write_row((row, embeddings_arr[i])) + self.conn.commit() + + if kwargs.get("last_batch"): + self._post_insert() + + return len(metadata), None + except Exception as e: + log.warning( + f"Failed to insert data into alloydb table ({self.table_name}), error: {e}" + ) + return 0, e + + def search_embedding( + self, + query: list[float], + k: int = 100, + filters: dict | None = None, + timeout: int | None = None, + ) -> list[int]: + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + + q = np.asarray(query) + if filters: + gt = filters.get("id") + result = self.cursor.execute( + self._filtered_search, (gt, q, k), prepare=True, binary=True + ) + else: + result = self.cursor.execute( + self._unfiltered_search, (q, k), prepare=True, binary=True + ) + + return [int(i[0]) for i in result.fetchall()] diff --git a/vectordb_bench/backend/clients/alloydb/cli.py b/vectordb_bench/backend/clients/alloydb/cli.py new file mode 100644 index 000000000..ab49da955 --- /dev/null +++ b/vectordb_bench/backend/clients/alloydb/cli.py @@ -0,0 +1,147 @@ +from typing import Annotated, Optional, TypedDict, Unpack + +import click +import os +from pydantic import SecretStr + +from vectordb_bench.backend.clients.api import MetricType + +from ....cli.cli import ( + CommonTypedDict, + cli, + click_parameter_decorators_from_typed_dict, + get_custom_case_config, + run, +) +from vectordb_bench.backend.clients import DB + + +class AlloyDBTypedDict(CommonTypedDict): + user_name: Annotated[ + str, click.option("--user-name", type=str, help="Db username", required=True) + ] + password: Annotated[ + str, + click.option("--password", + type=str, + help="Postgres database password", + default=lambda: os.environ.get("POSTGRES_PASSWORD", ""), + show_default="$POSTGRES_PASSWORD", + ), + ] + + host: Annotated[ + str, click.option("--host", type=str, help="Db host", required=True) + ] + db_name: Annotated[ + str, click.option("--db-name", type=str, help="Db name", required=True) + ] + maintenance_work_mem: Annotated[ + Optional[str], + click.option( + "--maintenance-work-mem", + type=str, + help="Sets the maximum memory to be used for maintenance operations (index creation). " + "Can be entered as string with unit like '64GB' or as an integer number of KB." + "This will set the parameters: max_parallel_maintenance_workers," + " max_parallel_workers & table(parallel_workers)", + required=False, + ), + ] + max_parallel_workers: Annotated[ + Optional[int], + click.option( + "--max-parallel-workers", + type=int, + help="Sets the maximum number of parallel processes per maintenance operation (index creation)", + required=False, + ), + ] + + + +class AlloyDBScaNNTypedDict(AlloyDBTypedDict): + num_leaves: Annotated[ + int, + click.option("--num-leaves", type=int, help="Number of leaves", required=True) + ] + num_leaves_to_search: Annotated[ + int, + click.option("--num-leaves-to-search", type=int, help="Number of leaves to search", required=True) + ] + pre_reordering_num_neighbors: Annotated[ + Optional[int], + click.option("--pre-reordering-num-neighbors", type=int, help="Pre-reordering number of neighbors",) + ] + max_top_neighbors_buffer_size: Annotated[ + int, + click.option("--max-top-neighbors-buffer-size", type=int, help="Maximum top neighbors buffer size", default=20_000) + ] + num_search_threads: Annotated[ + int, + click.option("--num-search-threads", type=int, help="Number of search threads", default=2) + ] + max_num_prefetch_datasets: Annotated[ + int, + click.option("--max-num-prefetch-datasets", type=int, help="Maximum number of prefetch datasets", default=100) + ] + quantizer: Annotated[ + str, + click.option( + "--quantizer", + type=click.Choice(["SQ8", "FLAT"]), + help="Quantizer type", + default="SQ8" + ) + ] + enable_pca: Annotated[ + bool, click.option( + "--enable-pca", + type=click.Choice(["on", "off"]), + help="Enable PCA", + default="on" + ) + ] + max_num_levels: Annotated[ + int, + click.option( + "--max-num-levels", + type=click.Choice([1, 2]), + help="Maximum number of levels", + default=1 + ) + ] + + +@cli.command() +@click_parameter_decorators_from_typed_dict(AlloyDBScaNNTypedDict) +def AlloyDBScaNN( + **parameters: Unpack[AlloyDBScaNNTypedDict], +): + from .config import AlloyDBConfig, AlloyDBScaNNConfig + + parameters["custom_case"] = get_custom_case_config(parameters) + run( + db=DB.AlloyDB, + db_config=AlloyDBConfig( + db_label=parameters["db_label"], + user_name=SecretStr(parameters["user_name"]), + password=SecretStr(parameters["password"]), + host=parameters["host"], + db_name=parameters["db_name"], + ), + db_case_config=AlloyDBScaNNConfig( + num_leaves=parameters["num_leaves"], + quantizer=parameters["quantizer"], + enable_pca=parameters["enable_pca"], + max_num_levels=parameters["max_num_levels"], + num_leaves_to_search=parameters["num_leaves_to_search"], + max_top_neighbors_buffer_size=parameters["max_top_neighbors_buffer_size"], + pre_reordering_num_neighbors=parameters["pre_reordering_num_neighbors"], + num_search_threads=parameters["num_search_threads"], + max_num_prefetch_datasets=parameters["max_num_prefetch_datasets"], + max_parallel_workers=parameters["max_parallel_workers"], + maintenance_work_mem=parameters["maintenance_work_mem"], + ), + **parameters, + ) \ No newline at end of file diff --git a/vectordb_bench/backend/clients/alloydb/config.py b/vectordb_bench/backend/clients/alloydb/config.py new file mode 100644 index 000000000..1d5dde519 --- /dev/null +++ b/vectordb_bench/backend/clients/alloydb/config.py @@ -0,0 +1,168 @@ +from abc import abstractmethod +from typing import Any, Mapping, Optional, Sequence, TypedDict +from pydantic import BaseModel, SecretStr +from typing_extensions import LiteralString +from ..api import DBCaseConfig, DBConfig, IndexType, MetricType + +POSTGRE_URL_PLACEHOLDER = "postgresql://%s:%s@%s/%s" + + +class AlloyDBConfigDict(TypedDict): + """These keys will be directly used as kwargs in psycopg connection string, + so the names must match exactly psycopg API""" + + user: str + password: str + host: str + port: int + dbname: str + + +class AlloyDBConfig(DBConfig): + user_name: SecretStr = SecretStr("postgres") + password: SecretStr + host: str = "localhost" + port: int = 5432 + db_name: str + + def to_dict(self) -> AlloyDBConfigDict: + user_str = self.user_name.get_secret_value() + pwd_str = self.password.get_secret_value() + return { + "host": self.host, + "port": self.port, + "dbname": self.db_name, + "user": user_str, + "password": pwd_str, + } + + +class AlloyDBIndexParam(TypedDict): + metric: str + index_type: str + index_creation_with_options: Sequence[dict[str, Any]] + maintenance_work_mem: Optional[str] + max_parallel_workers: Optional[int] + + +class AlloyDBSearchParam(TypedDict): + metric_fun_op: LiteralString + + +class AlloyDBSessionCommands(TypedDict): + session_options: Sequence[dict[str, Any]] + + +class AlloyDBIndexConfig(BaseModel, DBCaseConfig): + metric_type: MetricType | None = None + create_index_before_load: bool = False + create_index_after_load: bool = True + + def parse_metric(self) -> str: + if self.metric_type == MetricType.L2: + return "l2" + elif self.metric_type == MetricType.DP: + return "dot_product" + return "cosine" + + def parse_metric_fun_op(self) -> LiteralString: + if self.metric_type == MetricType.L2: + return "<->" + elif self.metric_type == MetricType.IP: + return "<#>" + return "<=>" + + @abstractmethod + def index_param(self) -> AlloyDBIndexParam: + ... + + @abstractmethod + def search_param(self) -> AlloyDBSearchParam: + ... + + @abstractmethod + def session_param(self) -> AlloyDBSessionCommands: + ... + + @staticmethod + def _optionally_build_with_options(with_options: Mapping[str, Any]) -> Sequence[dict[str, Any]]: + """Walk through mappings, creating a List of {key1 = value} pairs. That will be used to build a where clause""" + options = [] + for option_name, value in with_options.items(): + if value is not None: + options.append( + { + "option_name": option_name, + "val": str(value), + } + ) + return options + + @staticmethod + def _optionally_build_set_options( + set_mapping: Mapping[str, Any] + ) -> Sequence[dict[str, Any]]: + """Walk through options, creating 'SET 'key1 = "value1";' list""" + session_options = [] + for setting_name, value in set_mapping.items(): + if value: + session_options.append( + {"parameter": { + "setting_name": setting_name, + "val": str(value), + }, + } + ) + return session_options + + +class AlloyDBScaNNConfig(AlloyDBIndexConfig): + index: IndexType = IndexType.SCANN + num_leaves: int | None + quantizer: str | None + enable_pca: str | None + max_num_levels: int | None + num_leaves_to_search: int | None + max_top_neighbors_buffer_size: int | None + pre_reordering_num_neighbors: int | None + num_search_threads: int | None + max_num_prefetch_datasets: int | None + maintenance_work_mem: Optional[str] = None + max_parallel_workers: Optional[int] = None + + def index_param(self) -> AlloyDBIndexParam: + index_parameters = { + "num_leaves": self.num_leaves, "max_num_levels": self.max_num_levels, "quantizer": self.quantizer, + } + return { + "metric": self.parse_metric(), + "index_type": self.index.value, + "index_creation_with_options": self._optionally_build_with_options( + index_parameters + ), + "maintenance_work_mem": self.maintenance_work_mem, + "max_parallel_workers": self.max_parallel_workers, + "enable_pca": self.enable_pca, + } + + def search_param(self) -> AlloyDBSearchParam: + return { + "metric_fun_op": self.parse_metric_fun_op(), + } + + def session_param(self) -> AlloyDBSessionCommands: + session_parameters = { + "scann.num_leaves_to_search": self.num_leaves_to_search, + "scann.max_top_neighbors_buffer_size": self.max_top_neighbors_buffer_size, + "scann.pre_reordering_num_neighbors": self.pre_reordering_num_neighbors, + "scann.num_search_threads": self.num_search_threads, + "scann.max_num_prefetch_datasets": self.max_num_prefetch_datasets, + } + return { + "session_options": self._optionally_build_set_options(session_parameters) + } + + +_alloydb_case_config = { + IndexType.SCANN: AlloyDBScaNNConfig, +} diff --git a/vectordb_bench/backend/clients/api.py b/vectordb_bench/backend/clients/api.py index 0c26fdd3b..da2bed089 100644 --- a/vectordb_bench/backend/clients/api.py +++ b/vectordb_bench/backend/clients/api.py @@ -10,6 +10,7 @@ class MetricType(str, Enum): L2 = "L2" COSINE = "COSINE" IP = "IP" + DP = "DP" HAMMING = "HAMMING" JACCARD = "JACCARD" @@ -27,6 +28,7 @@ class IndexType(str, Enum): GPU_IVF_FLAT = "GPU_IVF_FLAT" GPU_IVF_PQ = "GPU_IVF_PQ" GPU_CAGRA = "GPU_CAGRA" + SCANN = "scann" class DBConfig(ABC, BaseModel): diff --git a/vectordb_bench/cli/vectordbbench.py b/vectordb_bench/cli/vectordbbench.py index 4d23ed952..f9ad69ceb 100644 --- a/vectordb_bench/cli/vectordbbench.py +++ b/vectordb_bench/cli/vectordbbench.py @@ -9,6 +9,7 @@ from ..backend.clients.zilliz_cloud.cli import ZillizAutoIndex from ..backend.clients.milvus.cli import MilvusAutoIndex from ..backend.clients.aws_opensearch.cli import AWSOpenSearch +from ..backend.clients.alloydb.cli import AlloyDBScaNN from .cli import cli @@ -24,6 +25,7 @@ cli.add_command(AWSOpenSearch) cli.add_command(PgVectorScaleDiskAnn) cli.add_command(PgDiskAnn) +cli.add_command(AlloyDBScaNN) if __name__ == "__main__": diff --git a/vectordb_bench/frontend/config/dbCaseConfigs.py b/vectordb_bench/frontend/config/dbCaseConfigs.py index 3db547fef..f8632105e 100644 --- a/vectordb_bench/frontend/config/dbCaseConfigs.py +++ b/vectordb_bench/frontend/config/dbCaseConfigs.py @@ -906,6 +906,141 @@ class CaseConfigInput(BaseModel): == "bit" and config.get(CaseConfigParamType.reranking, False) ) + +CaseConfigParamInput_IndexType_AlloyDB = CaseConfigInput( + label=CaseConfigParamType.IndexType, + inputHelp="Select Index Type", + inputType=InputType.Option, + inputConfig={ + "options": [ + IndexType.SCANN.value, + ], + }, +) + +CaseConfigParamInput_num_leaves_AlloyDB = CaseConfigInput( + label=CaseConfigParamType.numLeaves, + displayLabel="Num Leaves", + inputHelp="The number of partition to apply to this index", + inputType=InputType.Number, + inputConfig={ + "min": 1, + "max": 1048576, + "value": 200, + }, +) + +CaseConfigParamInput_quantizer_AlloyDB = CaseConfigInput( + label=CaseConfigParamType.quantizer, + inputType=InputType.Option, + inputConfig={ + "options": ["SQ8", "Flat"], + }, +) + +CaseConfigParamInput_max_num_levels_AlloyDB = CaseConfigInput( + label=CaseConfigParamType.maxNumLevels, + inputType=InputType.Option, + inputConfig={ + "options": [1, 2], + }, +) + +CaseConfigParamInput_enable_pca_AlloyDB = CaseConfigInput( + label=CaseConfigParamType.enablePca, + inputType=InputType.Option, + inputConfig={ + "options": ["on", "off"], + }, +) + +CaseConfigParamInput_num_leaves_to_search_AlloyDB = CaseConfigInput( + label=CaseConfigParamType.numLeavesToSearch, + displayLabel="Num leaves to search", + inputHelp="The database flag controls the trade off between recall and QPS", + inputType=InputType.Number, + inputConfig={ + "min": 20, + "max": 10486, + "value": 20, + }, +) + +CaseConfigParamInput_max_top_neighbors_buffer_size_AlloyDB = CaseConfigInput( + label=CaseConfigParamType.maxTopNeighborsBufferSize, + displayLabel="Max top neighbors buffer size", + inputHelp="The database flag specifies the size of cache used to improve the \ + performance for filtered queries by scoring or ranking the scanned candidate \ + neighbors in memory instead of the disk", + inputType=InputType.Number, + inputConfig={ + "min": 10000, + "max": 60000, + "value": 20000, + }, +) + +CaseConfigParamInput_pre_reordering_num_neighbors_AlloyDB = CaseConfigInput( + label=CaseConfigParamType.preReorderingNumNeigbors, + displayLabel="Pre reordering num neighbors", + inputHelp="Specifies the number of candidate neighbors to consider during the reordering \ + stages after initial search identifies a set of candidates", + inputType=InputType.Number, + inputConfig={ + "min": 20, + "max": 10486, + "value": 80, + }, +) + +CaseConfigParamInput_num_search_threads_AlloyDB = CaseConfigInput( + label=CaseConfigParamType.numSearchThreads, + displayLabel="Num of searcher threads", + inputHelp="The number of searcher threads for multi-thread search.", + inputType=InputType.Number, + inputConfig={ + "min": 1, + "max": 100, + "value": 2, + }, +) + +CaseConfigParamInput_max_num_prefetch_datasets_AlloyDB = CaseConfigInput( + label=CaseConfigParamType.maxNumPrefetchDatasets, + displayLabel="Max num prefetch datasets", + inputHelp="The maximum number of data batches to prefetch during index search, where batch is a group of buffer pages", + inputType=InputType.Number, + inputConfig={ + "min": 10, + "max": 150, + "value": 100, + }, +) + +CaseConfigParamInput_maintenance_work_mem_AlloyDB = CaseConfigInput( + label=CaseConfigParamType.maintenance_work_mem, + inputHelp="Recommended value: 1.33x the index size, not to exceed the available free memory." + "Specify in gigabytes. e.g. 8GB", + inputType=InputType.Text, + inputConfig={ + "value": "8GB", + }, +) + +CaseConfigParamInput_max_parallel_workers_AlloyDB = CaseConfigInput( + label=CaseConfigParamType.max_parallel_workers, + displayLabel="Max parallel workers", + inputHelp="Recommended value: (cpu cores - 1). This will set the parameters: max_parallel_maintenance_workers," + " max_parallel_workers & table(parallel_workers)", + inputType=InputType.Number, + inputConfig={ + "min": 0, + "max": 1024, + "value": 7, + }, +) + + MilvusLoadConfig = [ CaseConfigParamInput_IndexType, CaseConfigParamInput_M, @@ -1045,6 +1180,33 @@ class CaseConfigInput(BaseModel): CaseConfigParamInput_l_value_is, ] + +AlloyDBLoadConfig = [ + CaseConfigParamInput_IndexType_AlloyDB, + CaseConfigParamInput_num_leaves_AlloyDB, + CaseConfigParamInput_max_num_levels_AlloyDB, + CaseConfigParamInput_enable_pca_AlloyDB, + CaseConfigParamInput_quantizer_AlloyDB, + CaseConfigParamInput_maintenance_work_mem_AlloyDB, + CaseConfigParamInput_max_parallel_workers_AlloyDB, +] + +AlloyDBPerformanceConfig = [ + CaseConfigParamInput_IndexType_AlloyDB, + CaseConfigParamInput_num_leaves_AlloyDB, + CaseConfigParamInput_max_num_levels_AlloyDB, + CaseConfigParamInput_enable_pca_AlloyDB, + CaseConfigParamInput_quantizer_AlloyDB, + CaseConfigParamInput_num_search_threads_AlloyDB, + CaseConfigParamInput_num_leaves_to_search_AlloyDB, + CaseConfigParamInput_max_num_prefetch_datasets_AlloyDB, + CaseConfigParamInput_max_top_neighbors_buffer_size_AlloyDB, + CaseConfigParamInput_pre_reordering_num_neighbors_AlloyDB, + CaseConfigParamInput_maintenance_work_mem_AlloyDB, + CaseConfigParamInput_max_parallel_workers_AlloyDB, +] + + CASE_CONFIG_MAP = { DB.Milvus: { CaseLabel.Load: MilvusLoadConfig, @@ -1081,4 +1243,8 @@ class CaseConfigInput(BaseModel): CaseLabel.Load: PgDiskANNLoadConfig, CaseLabel.Performance: PgDiskANNPerformanceConfig, }, + DB.AlloyDB: { + CaseLabel.Load: AlloyDBLoadConfig, + CaseLabel.Performance: AlloyDBPerformanceConfig, + }, } diff --git a/vectordb_bench/models.py b/vectordb_bench/models.py index 9e064c564..648fb1727 100644 --- a/vectordb_bench/models.py +++ b/vectordb_bench/models.py @@ -76,6 +76,15 @@ class CaseConfigParamType(Enum): num_bits_per_dimension = "num_bits_per_dimension" query_search_list_size = "query_search_list_size" query_rescore = "query_rescore" + numLeaves = "num_leaves" + quantizer = "quantizer" + enablePca = "enable_pca" + maxNumLevels = "max_num_levels" + numLeavesToSearch = "num_leaves_to_search" + maxTopNeighborsBufferSize = "max_top_neighbors_buffer_size" + preReorderingNumNeigbors = "pre_reordering_num_neighbors" + numSearchThreads = "num_search_threads" + maxNumPrefetchDatasets = "max_num_prefetch_datasets" class CustomizedCase(BaseModel): From f986f61fda8fbd24cb8e0cb795e38cb0ec2fc58c Mon Sep 17 00:00:00 2001 From: Sheharyar Ahmad Date: Mon, 25 Nov 2024 22:24:29 +0500 Subject: [PATCH 2/3] Remove query that set storage to plain. --- vectordb_bench/backend/clients/alloydb/alloydb.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/vectordb_bench/backend/clients/alloydb/alloydb.py b/vectordb_bench/backend/clients/alloydb/alloydb.py index c053c2fad..5b275b30f 100644 --- a/vectordb_bench/backend/clients/alloydb/alloydb.py +++ b/vectordb_bench/backend/clients/alloydb/alloydb.py @@ -308,11 +308,6 @@ def _create_table(self, dim: int): "CREATE TABLE IF NOT EXISTS public.{table_name} (id BIGINT PRIMARY KEY, embedding vector({dim}));" ).format(table_name=sql.Identifier(self.table_name), dim=dim) ) - self.cursor.execute( - sql.SQL( - "ALTER TABLE public.{table_name} ALTER COLUMN embedding SET STORAGE PLAIN;" - ).format(table_name=sql.Identifier(self.table_name)) - ) self.conn.commit() except Exception as e: log.warning( From a0b3d369f2e83b9ca30abacd49b886ff40a69548 Mon Sep 17 00:00:00 2001 From: Sheharyar Ahmad Date: Mon, 25 Nov 2024 22:29:11 +0500 Subject: [PATCH 3/3] Add default value for pre_reordering_num_neighbors in cli options. --- vectordb_bench/backend/clients/alloydb/cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vectordb_bench/backend/clients/alloydb/cli.py b/vectordb_bench/backend/clients/alloydb/cli.py index ab49da955..aa3a5cbe8 100644 --- a/vectordb_bench/backend/clients/alloydb/cli.py +++ b/vectordb_bench/backend/clients/alloydb/cli.py @@ -70,8 +70,8 @@ class AlloyDBScaNNTypedDict(AlloyDBTypedDict): click.option("--num-leaves-to-search", type=int, help="Number of leaves to search", required=True) ] pre_reordering_num_neighbors: Annotated[ - Optional[int], - click.option("--pre-reordering-num-neighbors", type=int, help="Pre-reordering number of neighbors",) + int, + click.option("--pre-reordering-num-neighbors", type=int, help="Pre-reordering number of neighbors", default=200) ] max_top_neighbors_buffer_size: Annotated[ int,