From fd5ae3c39800a90eb6c45f0a8e12e1952b1162ea Mon Sep 17 00:00:00 2001 From: Navneet Verma Date: Wed, 16 Oct 2024 14:24:52 -0700 Subject: [PATCH] Added optimzation for Opensearch Signed-off-by: Navneet Verma --- .../clients/aws_opensearch/aws_opensearch.py | 53 ++++++++++++++++--- .../backend/clients/aws_opensearch/config.py | 18 ++++--- .../backend/clients/aws_opensearch/run.py | 37 +++++++++++-- .../frontend/config/dbCaseConfigs.py | 42 +++++++++++++++ 4 files changed, 135 insertions(+), 15 deletions(-) diff --git a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py index 5b0728ac8..a27eb01fc 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py +++ b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py @@ -3,7 +3,7 @@ import time from typing import Iterable, Type from ..api import VectorDB, DBCaseConfig, DBConfig, IndexType -from .config import AWSOpenSearchConfig, AWSOpenSearchIndexConfig +from .config import AWSOpenSearchConfig, AWSOpenSearchIndexConfig, AWSOS_Engine from opensearchpy import OpenSearch from opensearchpy.helpers import bulk @@ -83,7 +83,7 @@ def _create_index(self, client: OpenSearch): @contextmanager def init(self) -> None: - """connect to elasticsearch""" + """connect to opensearch""" self.client = OpenSearch(**self.db_config) yield @@ -97,7 +97,7 @@ def insert_embeddings( metadata: list[int], **kwargs, ) -> tuple[int, Exception]: - """Insert the embeddings to the elasticsearch.""" + """Insert the embeddings to the opensearch.""" assert self.client is not None, "should self.init() first" insert_data = [] @@ -136,13 +136,15 @@ def search_embedding( body = { "size": k, "query": {"knn": {self.vector_col_name: {"vector": query, "k": k}}}, + **({"filter": {"range": {self.id_col_name: {"gt": filters["id"]}}}} if filters else {}) } try: - resp = self.client.search(index=self.index_name, body=body) + resp = self.client.search(index=self.index_name, body=body,size=k,_source=False,docvalue_fields=[self.id_col_name],stored_fields="_none_",filter_path=[f"hits.hits.fields.{self.id_col_name}"],) log.info(f'Search took: {resp["took"]}') log.info(f'Search shards: {resp["_shards"]}') log.info(f'Search hits total: {resp["hits"]["total"]}') - result = [int(d["_id"]) for d in resp["hits"]["hits"]] + result = [h["fields"][self.id_col_name][0] for h in resp["hits"]["hits"]] + #result = [int(d["_id"]) for d in resp["hits"]["hits"]] # log.info(f'success! length={len(res)}') return result @@ -152,7 +154,46 @@ def search_embedding( def optimize(self): """optimize will be called between insertion and search in performance cases.""" - pass + # Call refresh first to ensure that all segments are created + self._refresh_index() + self._do_force_merge() + # Call refresh again to ensure that the index is ready after force merge. + self._refresh_index() + # ensure that all graphs are loaded in memory and ready for search + self._load_graphs_to_memory() + + def _refresh_index(self): + log.debug(f"Starting refresh for index {self.index_name}") + SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC = 30 + while True: + try: + log.info(f"Starting the Refresh Index..") + self.client.indices.refresh(index=self.index_name) + break + except Exception as e: + log.info( + f"Refresh errored out. Sleeping for {SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC} sec and then Retrying : {e}") + time.sleep(SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC) + continue + log.debug(f"Completed refresh for index {self.index_name}") + + def _do_force_merge(self): + log.debug(f"Starting force merge for index {self.index_name}") + force_merge_endpoint = f'/{self.index_name}/_forcemerge?max_num_segments=1&wait_for_completion=false' + force_merge_task_id = self.client.transport.perform_request('POST', force_merge_endpoint)['task'] + SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30 + while True: + time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC) + task_status = self.client.tasks.get(task_id=force_merge_task_id) + if task_status['completed']: + break + log.debug(f"Completed force merge for index {self.index_name}") + + def _load_graphs_to_memory(self): + if self.case_config.engine != AWSOS_Engine.lucene: + log.info("Calling warmup API to load graphs into memory") + warmup_endpoint = f'/_plugins/_knn/warmup/{self.index_name}' + self.client.transport.perform_request('GET', warmup_endpoint) def ready_to_load(self): """ready_to_load will be called before load in load cases.""" diff --git a/vectordb_bench/backend/clients/aws_opensearch/config.py b/vectordb_bench/backend/clients/aws_opensearch/config.py index bc82380b7..15cd4ead8 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/config.py +++ b/vectordb_bench/backend/clients/aws_opensearch/config.py @@ -1,9 +1,10 @@ +import logging from enum import Enum from pydantic import SecretStr, BaseModel from ..api import DBConfig, DBCaseConfig, MetricType, IndexType - +log = logging.getLogger(__name__) class AWSOpenSearchConfig(DBConfig, BaseModel): host: str = "" port: int = 443 @@ -31,14 +32,18 @@ class AWSOS_Engine(Enum): class AWSOpenSearchIndexConfig(BaseModel, DBCaseConfig): metric_type: MetricType = MetricType.L2 - engine: AWSOS_Engine = AWSOS_Engine.nmslib - efConstruction: int = 360 - M: int = 30 + engine: AWSOS_Engine = AWSOS_Engine.faiss + efConstruction: int = 256 + efSearch: int = 256 + M: int = 16 def parse_metric(self) -> str: if self.metric_type == MetricType.IP: - return "innerproduct" # only support faiss / nmslib, not for Lucene. + return "innerproduct" elif self.metric_type == MetricType.COSINE: + if self.engine == AWSOS_Engine.faiss: + log.info(f"Using metric type as innerproduct because faiss doesn't support cosine as metric type for Opensearch") + return "innerproduct" return "cosinesimil" return "l2" @@ -49,7 +54,8 @@ def index_param(self) -> dict: "engine": self.engine.value, "parameters": { "ef_construction": self.efConstruction, - "m": self.M + "m": self.M, + "ef_search": self.efSearch } } return params diff --git a/vectordb_bench/backend/clients/aws_opensearch/run.py b/vectordb_bench/backend/clients/aws_opensearch/run.py index 3924cbd75..d2698d139 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/run.py +++ b/vectordb_bench/backend/clients/aws_opensearch/run.py @@ -40,12 +40,12 @@ def create_index(client, index_name): "type": "knn_vector", "dimension": _DIM, "method": { - "engine": "nmslib", + "engine": "faiss", "name": "hnsw", "space_type": "l2", "parameters": { - "ef_construction": 128, - "m": 24, + "ef_construction": 256, + "m": 16, } } } @@ -108,12 +108,43 @@ def search(client, index_name): print('\nSearch not ready, sleep 1s') time.sleep(1) +def optimize_index(client, index_name): + print(f"Starting force merge for index {index_name}") + force_merge_endpoint = f'/{index_name}/_forcemerge?max_num_segments=1&wait_for_completion=false' + force_merge_task_id = client.transport.perform_request('POST', force_merge_endpoint)['task'] + SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30 + while True: + time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC) + task_status = client.tasks.get(task_id=force_merge_task_id) + if task_status['completed']: + break + print(f"Completed force merge for index {index_name}") + + +def refresh_index(client, index_name): + print(f"Starting refresh for index {index_name}") + SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC = 30 + while True: + try: + print(f"Starting the Refresh Index..") + client.indices.refresh(index=index_name) + break + except Exception as e: + print( + f"Refresh errored out. Sleeping for {SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC} sec and then Retrying : {e}") + time.sleep(SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC) + continue + print(f"Completed refresh for index {index_name}") + + def main(): client = create_client() try: create_index(client, _INDEX_NAME) bulk_insert(client, _INDEX_NAME) + optimize_index(client, _INDEX_NAME) + refresh_index(client, _INDEX_NAME) search(client, _INDEX_NAME) delete_index(client, _INDEX_NAME) except Exception as e: diff --git a/vectordb_bench/frontend/config/dbCaseConfigs.py b/vectordb_bench/frontend/config/dbCaseConfigs.py index 78d1936d7..68bf83f19 100644 --- a/vectordb_bench/frontend/config/dbCaseConfigs.py +++ b/vectordb_bench/frontend/config/dbCaseConfigs.py @@ -360,6 +360,37 @@ class CaseConfigInput(BaseModel): }, ) +CaseConfigParamInput_EFConstruction_AWSOpensearch = CaseConfigInput( + label=CaseConfigParamType.EFConstruction, + inputType=InputType.Number, + inputConfig={ + "min": 100, + "max": 1024, + "value": 256, + }, +) + +CaseConfigParamInput_M_AWSOpensearch = CaseConfigInput( + label=CaseConfigParamType.M, + inputType=InputType.Number, + inputConfig={ + "min": 4, + "max": 64, + "value": 16, + }, +) + +CaseConfigParamInput_EF_SEARCH_AWSOpensearch = CaseConfigInput( + label=CaseConfigParamType.ef_search, + inputType=InputType.Number, + inputConfig={ + "min": 100, + "max": 1024, + "value": 256, + }, +) + + CaseConfigParamInput_maintenance_work_mem_PgVector = CaseConfigInput( label=CaseConfigParamType.maintenance_work_mem, inputHelp="Recommended value: 1.33x the index size, not to exceed the available free memory." @@ -839,6 +870,13 @@ class CaseConfigInput(BaseModel): CaseConfigParamInput_NumCandidates_ES, ] +AWSOpensearchLoadingConfig = [CaseConfigParamInput_EFConstruction_AWSOpensearch, CaseConfigParamInput_M_AWSOpensearch] +AWSOpenSearchPerformanceConfig = [ + CaseConfigParamInput_EFConstruction_AWSOpensearch, + CaseConfigParamInput_M_AWSOpensearch, + CaseConfigParamInput_EF_SEARCH_AWSOpensearch, +] + PgVectorLoadingConfig = [ CaseConfigParamInput_IndexType_PgVector, CaseConfigParamInput_Lists_PgVector, @@ -920,6 +958,10 @@ class CaseConfigInput(BaseModel): CaseLabel.Load: ESLoadingConfig, CaseLabel.Performance: ESPerformanceConfig, }, + DB.AWSOpenSearch: { + CaseLabel.Load: AWSOpensearchLoadingConfig, + CaseLabel.Performance: AWSOpenSearchPerformanceConfig, + }, DB.PgVector: { CaseLabel.Load: PgVectorLoadingConfig, CaseLabel.Performance: PgVectorPerformanceConfig,