Skip to content

Commit

Permalink
Optimize pgvector test for semi-recent enhancements
Browse files Browse the repository at this point in the history
This commit adds several changes to the pgvector test to create a
more representative test environment based on recent and older
changes to pgvector. Notable changes include allowing for testing
of parallel index buiding parameters, using loading with the recommended
binary loading method, and other changes to better emulate what a
typical user of pgvector would do.

This commit also has some general cleanups as well.

Co-authored-by: Mark Greenhalgh <[email protected]>
Co-authored-by: Tyler House <[email protected]>
  • Loading branch information
3 people committed May 8, 2024
1 parent 2a64abe commit 3e5d0c3
Show file tree
Hide file tree
Showing 7 changed files with 494 additions and 146 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ all = [
"redis",
"chromadb",
"psycopg2",
"psycopg",
]

qdrant = [ "qdrant-client" ]
pinecone = [ "pinecone-client" ]
weaviate = [ "weaviate-client" ]
elastic = [ "elasticsearch" ]
pgvector = [ "pgvector", "psycopg2" ]
pgvector = [ "pgvector", "psycopg" ]
pgvecto_rs = [ "psycopg2" ]
redis = [ "redis" ]
chromadb = [ "chromadb" ]
Expand Down
1 change: 1 addition & 0 deletions vectordb_bench/backend/clients/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class IndexType(str, Enum):
Flat = "FLAT"
AUTOINDEX = "AUTOINDEX"
ES_HNSW = "hnsw"
ES_IVFFlat = "ivfflat"
GPU_IVF_FLAT = "GPU_IVF_FLAT"
GPU_IVF_PQ = "GPU_IVF_PQ"
GPU_CAGRA = "GPU_CAGRA"
Expand Down
204 changes: 165 additions & 39 deletions vectordb_bench/backend/clients/pgvector/config.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,64 @@
from abc import abstractmethod
from typing import Any, Mapping, Optional, Sequence, TypedDict

from psycopg import sql
from pydantic import BaseModel, SecretStr
from ..api import DBConfig, DBCaseConfig, IndexType, MetricType
from typing_extensions import LiteralString

from ..api import DBCaseConfig, DBConfig, IndexType, MetricType

POSTGRE_URL_PLACEHOLDER = "postgresql://%s:%s@%s/%s"


class PgVectorConfigDict(TypedDict):
"""These keys will be directly used as kwargs in psycopg connection string, so the names must match exactly psycopg API"""

user: str
password: str
host: str
port: int
dbname: str


class PgVectorConfig(DBConfig):
user_name: SecretStr = "postgres"
user_name: SecretStr = SecretStr("postgres")
password: SecretStr
host: str = "localhost"
port: int = 5432
db_name: str

def to_dict(self) -> dict:
def to_dict(self) -> PgVectorConfigDict:
user_str = self.user_name.get_secret_value()
pwd_str = self.password.get_secret_value()
return {
"host" : self.host,
"port" : self.port,
"dbname" : self.db_name,
"user" : user_str,
"password" : pwd_str
"host": self.host,
"port": self.port,
"dbname": self.db_name,
"user": user_str,
"password": pwd_str,
}


class PgVectorIndexParam(TypedDict):
metric: str
index_type: str
index_creation_with_clause: sql.Composed
maintenance_work_mem: Optional[str]
max_parallel_workers: Optional[int]


class PgVectorSearchParam(TypedDict):
metric_fun_op: LiteralString


class PgVectorSessionCommands(TypedDict):
session_commands: Sequence[sql.Composed]


class PgVectorIndexConfig(BaseModel, DBCaseConfig):
metric_type: MetricType | None = None
index: IndexType
create_index_before_load: bool = False
create_index_after_load: bool = True

def parse_metric(self) -> str:
if self.metric_type == MetricType.L2:
Expand All @@ -32,7 +67,7 @@ def parse_metric(self) -> str:
return "vector_ip_ops"
return "vector_cosine_ops"

def parse_metric_fun_op(self) -> str:
def parse_metric_fun_op(self) -> LiteralString:
if self.metric_type == MetricType.L2:
return "<->"
elif self.metric_type == MetricType.IP:
Expand All @@ -46,48 +81,139 @@ def parse_metric_fun_str(self) -> str:
return "max_inner_product"
return "cosine_distance"

@abstractmethod
def index_param(self) -> PgVectorIndexParam:
...

@abstractmethod
def search_param(self) -> PgVectorSearchParam:
...

@abstractmethod
def session_param(self) -> PgVectorSessionCommands:
...

@staticmethod
def _optionally_build_with_clause(with_options: Mapping[str, Any]) -> sql.Composed:
"""Walk through mappings, creating 'WITH ('key1 = "value1", ..)' clause otherwise return empty sql.Composable"""
options = []
for option_name, value in with_options.items():
if value is not None:
options.append(
sql.SQL("{option_name} = {val}").format(
option_name=sql.Identifier(option_name),
val=sql.Identifier(str(value)),
)
)

if any(options):
with_clause = sql.SQL("WITH ({});").format(sql.SQL(", ").join(options))
else:
with_clause = sql.Composed(())

return with_clause

@staticmethod
def _optionally_build_set_commands(
set_mapping: Mapping[str, Any]
) -> Sequence[sql.Composed]:
"""Walk through options, creating 'SET 'key1 = "value1";' commands"""
session_commands = []
for setting_name, value in set_mapping.items():
if value:
session_commands.append(
sql.SQL("SET {setting_name} " + "= {val};").format(
setting_name=sql.Identifier(setting_name),
val=sql.Identifier(str(value)),
)
)
return session_commands


class PgVectorIVFFlatConfig(PgVectorIndexConfig):
"""
An IVFFlat index divides vectors into lists, and then searches a subset of those lists that are
closest to the query vector. It has faster build times and uses less memory than HNSW,
but has lower query performance (in terms of speed-recall tradeoff).
Three keys to achieving good recall are:
Create the index after the table has some data
Choose an appropriate number of lists - a good place to start is rows / 1000 for up to 1M rows and sqrt(rows) for over 1M rows
When querying, specify an appropriate number of probes (higher is better for recall, lower is better for speed) - a good place to start is sqrt(lists)
"""

lists: int | None
probes: int | None
index: IndexType = IndexType.ES_IVFFlat
maintenance_work_mem: Optional[str] = None
max_parallel_workers: Optional[int] = None

def index_param(self) -> PgVectorIndexParam:
index_parameters = {"lists": self.lists}
return {
"metric": self.parse_metric(),
"index_type": self.index.value,
"index_creation_with_clause": self._optionally_build_with_clause(
index_parameters
),
"maintenance_work_mem": self.maintenance_work_mem,
"max_parallel_workers": self.max_parallel_workers,
}


class HNSWConfig(PgVectorIndexConfig):
M: int
efConstruction: int
ef: int | None = None
index: IndexType = IndexType.HNSW

def index_param(self) -> dict:
def search_param(self) -> PgVectorSearchParam:
return {
"m" : self.M,
"ef_construction" : self.efConstruction,
"metric" : self.parse_metric()
"metric_fun_op": self.parse_metric_fun_op(),
}

def search_param(self) -> dict:
def session_param(self) -> PgVectorSessionCommands:
session_parameters = {"ivfflat.probes": self.probes}
return {
"ef" : self.ef,
"metric_fun" : self.parse_metric_fun_str(),
"metric_fun_op" : self.parse_metric_fun_op(),
"session_commands": self._optionally_build_set_commands(session_parameters)
}


class IVFFlatConfig(PgVectorIndexConfig):
lists: int | None = 1000
probes: int | None = 10
index: IndexType = IndexType.IVFFlat
class PgVectorHNSWConfig(PgVectorIndexConfig):
"""
An HNSW index creates a multilayer graph. It has better query performance than IVFFlat (in terms of
speed-recall tradeoff), but has slower build times and uses more memory. Also, an index can be
created without any data in the table since there isn't a training step like IVFFlat.
"""

m: int | None # DETAIL: Valid values are between "2" and "100".
ef_construction: (
int | None
) # ef_construction must be greater than or equal to 2 * m
ef_search: int | None
index: IndexType = IndexType.ES_HNSW
maintenance_work_mem: Optional[str] = None
max_parallel_workers: Optional[int] = None

def index_param(self) -> PgVectorIndexParam:
index_parameters = {"m": self.m, "ef_construction": self.ef_construction}
return {
"metric": self.parse_metric(),
"index_type": self.index.value,
"index_creation_with_clause": self._optionally_build_with_clause(
index_parameters
),
"maintenance_work_mem": self.maintenance_work_mem,
"max_parallel_workers": self.max_parallel_workers,
}

def index_param(self) -> dict:
def search_param(self) -> PgVectorSearchParam:
return {
"lists" : self.lists,
"metric" : self.parse_metric()
"metric_fun_op": self.parse_metric_fun_op(),
}

def search_param(self) -> dict:
def session_param(self) -> PgVectorSessionCommands:
session_parameters = {"hnsw.ef_search": self.ef_search}
return {
"probes" : self.probes,
"metric_fun" : self.parse_metric_fun_str(),
"metric_fun_op" : self.parse_metric_fun_op(),
"session_commands": self._optionally_build_set_commands(session_parameters)
}

_pgvector_case_config = {
IndexType.HNSW: HNSWConfig,
IndexType.IVFFlat: IVFFlatConfig,
}
IndexType.HNSW: PgVectorHNSWConfig,
IndexType.ES_HNSW: PgVectorHNSWConfig,
IndexType.IVFFlat: PgVectorIVFFlatConfig,
}
Loading

0 comments on commit 3e5d0c3

Please sign in to comment.