Skip to content

Commit

Permalink
Merge branch 'main' into diskann-support
Browse files Browse the repository at this point in the history
  • Loading branch information
wahajali authored Oct 28, 2024
2 parents 69ce9f4 + c66dfb5 commit 89352d9
Show file tree
Hide file tree
Showing 23 changed files with 649 additions and 138 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,27 @@ Options:
--ef-search INTEGER hnsw ef-search
--quantization-type [none|halfvec]
quantization type for vectors
--custom-case-name TEXT Custom case name i.e. PerformanceCase1536D50K
--custom-case-description TEXT Custom name description
--custom-case-load-timeout INTEGER
Custom case load timeout [default: 36000]
--custom-case-optimize-timeout INTEGER
Custom case optimize timeout [default: 36000]
--custom-dataset-name TEXT
Dataset name i.e OpenAI
--custom-dataset-dir TEXT Dataset directory i.e. openai_medium_500k
--custom-dataset-size INTEGER Dataset size i.e. 500000
--custom-dataset-dim INTEGER Dataset dimension
--custom-dataset-metric-type TEXT
Dataset distance metric [default: COSINE]
--custom-dataset-file-count INTEGER
Dataset file count
--custom-dataset-use-shuffled / --skip-custom-dataset-use-shuffled
Use shuffled custom dataset or skip [default: custom-dataset-
use-shuffled]
--custom-dataset-with-gt / --skip-custom-dataset-with-gt
Custom dataset with ground truth or skip [default: custom-dataset-
with-gt]
--help Show this message and exit.
```
#### Using a configuration file.
Expand Down Expand Up @@ -464,6 +485,8 @@ def ZillizAutoIndex(**parameters: Unpack[ZillizTypedDict]):
3. Update db_config and db_case_config to match client requirements
4. Continue to add new functions for each index config.
5. Import the client cli module and command to vectordb_bench/cli/vectordbbench.py (for databases with multiple commands (index configs), this only needs to be done for one command)
6. Import the `get_custom_case_config` function from `vectordb_bench/cli/cli.py` and use it to add a new key `custom_case` to the `parameters` variable within the command.


> cli modules with multiple index configs:
> - pgvector: vectordb_bench/backend/clients/pgvector/cli.py
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ all = [
"weaviate-client",
"elasticsearch",
"pgvector",
"pgvecto_rs[psycopg3]>=0.2.1",
"pgvecto_rs[psycopg3]>=0.2.2",
"sqlalchemy",
"redis",
"chromadb",
Expand All @@ -73,7 +73,7 @@ elastic = [ "elasticsearch" ]
pgvector = [ "psycopg", "psycopg-binary", "pgvector" ]
pgvectorscale = [ "psycopg", "psycopg-binary", "pgvector" ]
pgdiskann = [ "psycopg", "psycopg-binary", "pgvector" ]
pgvecto_rs = [ "pgvecto_rs[psycopg3]>=0.2.1" ]
pgvecto_rs = [ "pgvecto_rs[psycopg3]>=0.2.2" ]
redis = [ "redis" ]
memorydb = [ "memorydb" ]
chromadb = [ "chromadb" ]
Expand Down
53 changes: 47 additions & 6 deletions vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
from typing import Iterable, Type
from ..api import VectorDB, DBCaseConfig, DBConfig, IndexType
from .config import AWSOpenSearchConfig, AWSOpenSearchIndexConfig
from .config import AWSOpenSearchConfig, AWSOpenSearchIndexConfig, AWSOS_Engine
from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk

Expand Down Expand Up @@ -83,7 +83,7 @@ def _create_index(self, client: OpenSearch):

@contextmanager
def init(self) -> None:
"""connect to elasticsearch"""
"""connect to opensearch"""
self.client = OpenSearch(**self.db_config)

yield
Expand All @@ -97,7 +97,7 @@ def insert_embeddings(
metadata: list[int],
**kwargs,
) -> tuple[int, Exception]:
"""Insert the embeddings to the elasticsearch."""
"""Insert the embeddings to the opensearch."""
assert self.client is not None, "should self.init() first"

insert_data = []
Expand Down Expand Up @@ -136,13 +136,15 @@ def search_embedding(
body = {
"size": k,
"query": {"knn": {self.vector_col_name: {"vector": query, "k": k}}},
**({"filter": {"range": {self.id_col_name: {"gt": filters["id"]}}}} if filters else {})
}
try:
resp = self.client.search(index=self.index_name, body=body)
resp = self.client.search(index=self.index_name, body=body,size=k,_source=False,docvalue_fields=[self.id_col_name],stored_fields="_none_",filter_path=[f"hits.hits.fields.{self.id_col_name}"],)
log.info(f'Search took: {resp["took"]}')
log.info(f'Search shards: {resp["_shards"]}')
log.info(f'Search hits total: {resp["hits"]["total"]}')
result = [int(d["_id"]) for d in resp["hits"]["hits"]]
result = [h["fields"][self.id_col_name][0] for h in resp["hits"]["hits"]]
#result = [int(d["_id"]) for d in resp["hits"]["hits"]]
# log.info(f'success! length={len(res)}')

return result
Expand All @@ -152,7 +154,46 @@ def search_embedding(

def optimize(self):
"""optimize will be called between insertion and search in performance cases."""
pass
# Call refresh first to ensure that all segments are created
self._refresh_index()
self._do_force_merge()
# Call refresh again to ensure that the index is ready after force merge.
self._refresh_index()
# ensure that all graphs are loaded in memory and ready for search
self._load_graphs_to_memory()

def _refresh_index(self):
log.debug(f"Starting refresh for index {self.index_name}")
SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC = 30
while True:
try:
log.info(f"Starting the Refresh Index..")
self.client.indices.refresh(index=self.index_name)
break
except Exception as e:
log.info(
f"Refresh errored out. Sleeping for {SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC} sec and then Retrying : {e}")
time.sleep(SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC)
continue
log.debug(f"Completed refresh for index {self.index_name}")

def _do_force_merge(self):
log.debug(f"Starting force merge for index {self.index_name}")
force_merge_endpoint = f'/{self.index_name}/_forcemerge?max_num_segments=1&wait_for_completion=false'
force_merge_task_id = self.client.transport.perform_request('POST', force_merge_endpoint)['task']
SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30
while True:
time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC)
task_status = self.client.tasks.get(task_id=force_merge_task_id)
if task_status['completed']:
break
log.debug(f"Completed force merge for index {self.index_name}")

def _load_graphs_to_memory(self):
if self.case_config.engine != AWSOS_Engine.lucene:
log.info("Calling warmup API to load graphs into memory")
warmup_endpoint = f'/_plugins/_knn/warmup/{self.index_name}'
self.client.transport.perform_request('GET', warmup_endpoint)

def ready_to_load(self):
"""ready_to_load will be called before load in load cases."""
Expand Down
18 changes: 12 additions & 6 deletions vectordb_bench/backend/clients/aws_opensearch/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
from enum import Enum
from pydantic import SecretStr, BaseModel

from ..api import DBConfig, DBCaseConfig, MetricType, IndexType


log = logging.getLogger(__name__)
class AWSOpenSearchConfig(DBConfig, BaseModel):
host: str = ""
port: int = 443
Expand Down Expand Up @@ -31,14 +32,18 @@ class AWSOS_Engine(Enum):

class AWSOpenSearchIndexConfig(BaseModel, DBCaseConfig):
metric_type: MetricType = MetricType.L2
engine: AWSOS_Engine = AWSOS_Engine.nmslib
efConstruction: int = 360
M: int = 30
engine: AWSOS_Engine = AWSOS_Engine.faiss
efConstruction: int = 256
efSearch: int = 256
M: int = 16

def parse_metric(self) -> str:
if self.metric_type == MetricType.IP:
return "innerproduct" # only support faiss / nmslib, not for Lucene.
return "innerproduct"
elif self.metric_type == MetricType.COSINE:
if self.engine == AWSOS_Engine.faiss:
log.info(f"Using metric type as innerproduct because faiss doesn't support cosine as metric type for Opensearch")
return "innerproduct"
return "cosinesimil"
return "l2"

Expand All @@ -49,7 +54,8 @@ def index_param(self) -> dict:
"engine": self.engine.value,
"parameters": {
"ef_construction": self.efConstruction,
"m": self.M
"m": self.M,
"ef_search": self.efSearch
}
}
return params
Expand Down
37 changes: 34 additions & 3 deletions vectordb_bench/backend/clients/aws_opensearch/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ def create_index(client, index_name):
"type": "knn_vector",
"dimension": _DIM,
"method": {
"engine": "nmslib",
"engine": "faiss",
"name": "hnsw",
"space_type": "l2",
"parameters": {
"ef_construction": 128,
"m": 24,
"ef_construction": 256,
"m": 16,
}
}
}
Expand Down Expand Up @@ -108,12 +108,43 @@ def search(client, index_name):
print('\nSearch not ready, sleep 1s')
time.sleep(1)

def optimize_index(client, index_name):
print(f"Starting force merge for index {index_name}")
force_merge_endpoint = f'/{index_name}/_forcemerge?max_num_segments=1&wait_for_completion=false'
force_merge_task_id = client.transport.perform_request('POST', force_merge_endpoint)['task']
SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30
while True:
time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC)
task_status = client.tasks.get(task_id=force_merge_task_id)
if task_status['completed']:
break
print(f"Completed force merge for index {index_name}")


def refresh_index(client, index_name):
print(f"Starting refresh for index {index_name}")
SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC = 30
while True:
try:
print(f"Starting the Refresh Index..")
client.indices.refresh(index=index_name)
break
except Exception as e:
print(
f"Refresh errored out. Sleeping for {SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC} sec and then Retrying : {e}")
time.sleep(SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC)
continue
print(f"Completed refresh for index {index_name}")



def main():
client = create_client()
try:
create_index(client, _INDEX_NAME)
bulk_insert(client, _INDEX_NAME)
optimize_index(client, _INDEX_NAME)
refresh_index(client, _INDEX_NAME)
search(client, _INDEX_NAME)
delete_index(client, _INDEX_NAME)
except Exception as e:
Expand Down
3 changes: 3 additions & 0 deletions vectordb_bench/backend/clients/pgvector/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
IVFFlatTypedDict,
cli,
click_parameter_decorators_from_typed_dict,
get_custom_case_config,
run,
)
from vectordb_bench.backend.clients import DB
Expand Down Expand Up @@ -77,6 +78,7 @@ def PgVectorIVFFlat(
):
from .config import PgVectorConfig, PgVectorIVFFlatConfig

parameters["custom_case"] = get_custom_case_config(parameters)
run(
db=DB.PgVector,
db_config=PgVectorConfig(
Expand Down Expand Up @@ -107,6 +109,7 @@ def PgVectorHNSW(
):
from .config import PgVectorConfig, PgVectorHNSWConfig

parameters["custom_case"] = get_custom_case_config(parameters)
run(
db=DB.PgVector,
db_config=PgVectorConfig(
Expand Down
2 changes: 0 additions & 2 deletions vectordb_bench/backend/clients/pinecone/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@

class PineconeConfig(DBConfig):
api_key: SecretStr
environment: SecretStr
index_name: str

def to_dict(self) -> dict:
return {
"api_key": self.api_key.get_secret_value(),
"environment": self.environment.get_secret_value(),
"index_name": self.index_name,
}
Loading

0 comments on commit 89352d9

Please sign in to comment.