Skip to content

Commit

Permalink
Added optimzation for Opensearch
Browse files Browse the repository at this point in the history
Signed-off-by: Navneet Verma <[email protected]>
  • Loading branch information
navneet1v committed Oct 16, 2024
1 parent 5e7e438 commit fd5ae3c
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 15 deletions.
53 changes: 47 additions & 6 deletions vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
from typing import Iterable, Type
from ..api import VectorDB, DBCaseConfig, DBConfig, IndexType
from .config import AWSOpenSearchConfig, AWSOpenSearchIndexConfig
from .config import AWSOpenSearchConfig, AWSOpenSearchIndexConfig, AWSOS_Engine
from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk

Expand Down Expand Up @@ -83,7 +83,7 @@ def _create_index(self, client: OpenSearch):

@contextmanager
def init(self) -> None:
"""connect to elasticsearch"""
"""connect to opensearch"""
self.client = OpenSearch(**self.db_config)

yield
Expand All @@ -97,7 +97,7 @@ def insert_embeddings(
metadata: list[int],
**kwargs,
) -> tuple[int, Exception]:
"""Insert the embeddings to the elasticsearch."""
"""Insert the embeddings to the opensearch."""
assert self.client is not None, "should self.init() first"

insert_data = []
Expand Down Expand Up @@ -136,13 +136,15 @@ def search_embedding(
body = {
"size": k,
"query": {"knn": {self.vector_col_name: {"vector": query, "k": k}}},
**({"filter": {"range": {self.id_col_name: {"gt": filters["id"]}}}} if filters else {})
}
try:
resp = self.client.search(index=self.index_name, body=body)
resp = self.client.search(index=self.index_name, body=body,size=k,_source=False,docvalue_fields=[self.id_col_name],stored_fields="_none_",filter_path=[f"hits.hits.fields.{self.id_col_name}"],)
log.info(f'Search took: {resp["took"]}')
log.info(f'Search shards: {resp["_shards"]}')
log.info(f'Search hits total: {resp["hits"]["total"]}')
result = [int(d["_id"]) for d in resp["hits"]["hits"]]
result = [h["fields"][self.id_col_name][0] for h in resp["hits"]["hits"]]
#result = [int(d["_id"]) for d in resp["hits"]["hits"]]
# log.info(f'success! length={len(res)}')

return result
Expand All @@ -152,7 +154,46 @@ def search_embedding(

def optimize(self):
"""optimize will be called between insertion and search in performance cases."""
pass
# Call refresh first to ensure that all segments are created
self._refresh_index()
self._do_force_merge()
# Call refresh again to ensure that the index is ready after force merge.
self._refresh_index()
# ensure that all graphs are loaded in memory and ready for search
self._load_graphs_to_memory()

def _refresh_index(self):
log.debug(f"Starting refresh for index {self.index_name}")
SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC = 30
while True:
try:
log.info(f"Starting the Refresh Index..")
self.client.indices.refresh(index=self.index_name)
break
except Exception as e:
log.info(
f"Refresh errored out. Sleeping for {SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC} sec and then Retrying : {e}")
time.sleep(SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC)
continue
log.debug(f"Completed refresh for index {self.index_name}")

def _do_force_merge(self):
log.debug(f"Starting force merge for index {self.index_name}")
force_merge_endpoint = f'/{self.index_name}/_forcemerge?max_num_segments=1&wait_for_completion=false'
force_merge_task_id = self.client.transport.perform_request('POST', force_merge_endpoint)['task']
SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30
while True:
time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC)
task_status = self.client.tasks.get(task_id=force_merge_task_id)
if task_status['completed']:
break
log.debug(f"Completed force merge for index {self.index_name}")

def _load_graphs_to_memory(self):
if self.case_config.engine != AWSOS_Engine.lucene:
log.info("Calling warmup API to load graphs into memory")
warmup_endpoint = f'/_plugins/_knn/warmup/{self.index_name}'
self.client.transport.perform_request('GET', warmup_endpoint)

def ready_to_load(self):
"""ready_to_load will be called before load in load cases."""
Expand Down
18 changes: 12 additions & 6 deletions vectordb_bench/backend/clients/aws_opensearch/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
from enum import Enum
from pydantic import SecretStr, BaseModel

from ..api import DBConfig, DBCaseConfig, MetricType, IndexType


log = logging.getLogger(__name__)
class AWSOpenSearchConfig(DBConfig, BaseModel):
host: str = ""
port: int = 443
Expand Down Expand Up @@ -31,14 +32,18 @@ class AWSOS_Engine(Enum):

class AWSOpenSearchIndexConfig(BaseModel, DBCaseConfig):
metric_type: MetricType = MetricType.L2
engine: AWSOS_Engine = AWSOS_Engine.nmslib
efConstruction: int = 360
M: int = 30
engine: AWSOS_Engine = AWSOS_Engine.faiss
efConstruction: int = 256
efSearch: int = 256
M: int = 16

def parse_metric(self) -> str:
if self.metric_type == MetricType.IP:
return "innerproduct" # only support faiss / nmslib, not for Lucene.
return "innerproduct"
elif self.metric_type == MetricType.COSINE:
if self.engine == AWSOS_Engine.faiss:
log.info(f"Using metric type as innerproduct because faiss doesn't support cosine as metric type for Opensearch")
return "innerproduct"
return "cosinesimil"
return "l2"

Expand All @@ -49,7 +54,8 @@ def index_param(self) -> dict:
"engine": self.engine.value,
"parameters": {
"ef_construction": self.efConstruction,
"m": self.M
"m": self.M,
"ef_search": self.efSearch
}
}
return params
Expand Down
37 changes: 34 additions & 3 deletions vectordb_bench/backend/clients/aws_opensearch/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ def create_index(client, index_name):
"type": "knn_vector",
"dimension": _DIM,
"method": {
"engine": "nmslib",
"engine": "faiss",
"name": "hnsw",
"space_type": "l2",
"parameters": {
"ef_construction": 128,
"m": 24,
"ef_construction": 256,
"m": 16,
}
}
}
Expand Down Expand Up @@ -108,12 +108,43 @@ def search(client, index_name):
print('\nSearch not ready, sleep 1s')
time.sleep(1)

def optimize_index(client, index_name):
print(f"Starting force merge for index {index_name}")
force_merge_endpoint = f'/{index_name}/_forcemerge?max_num_segments=1&wait_for_completion=false'
force_merge_task_id = client.transport.perform_request('POST', force_merge_endpoint)['task']
SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30
while True:
time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC)
task_status = client.tasks.get(task_id=force_merge_task_id)
if task_status['completed']:
break
print(f"Completed force merge for index {index_name}")


def refresh_index(client, index_name):
print(f"Starting refresh for index {index_name}")
SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC = 30
while True:
try:
print(f"Starting the Refresh Index..")
client.indices.refresh(index=index_name)
break
except Exception as e:
print(
f"Refresh errored out. Sleeping for {SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC} sec and then Retrying : {e}")
time.sleep(SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC)
continue
print(f"Completed refresh for index {index_name}")



def main():
client = create_client()
try:
create_index(client, _INDEX_NAME)
bulk_insert(client, _INDEX_NAME)
optimize_index(client, _INDEX_NAME)
refresh_index(client, _INDEX_NAME)
search(client, _INDEX_NAME)
delete_index(client, _INDEX_NAME)
except Exception as e:
Expand Down
42 changes: 42 additions & 0 deletions vectordb_bench/frontend/config/dbCaseConfigs.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,37 @@ class CaseConfigInput(BaseModel):
},
)

CaseConfigParamInput_EFConstruction_AWSOpensearch = CaseConfigInput(
label=CaseConfigParamType.EFConstruction,
inputType=InputType.Number,
inputConfig={
"min": 100,
"max": 1024,
"value": 256,
},
)

CaseConfigParamInput_M_AWSOpensearch = CaseConfigInput(
label=CaseConfigParamType.M,
inputType=InputType.Number,
inputConfig={
"min": 4,
"max": 64,
"value": 16,
},
)

CaseConfigParamInput_EF_SEARCH_AWSOpensearch = CaseConfigInput(
label=CaseConfigParamType.ef_search,
inputType=InputType.Number,
inputConfig={
"min": 100,
"max": 1024,
"value": 256,
},
)


CaseConfigParamInput_maintenance_work_mem_PgVector = CaseConfigInput(
label=CaseConfigParamType.maintenance_work_mem,
inputHelp="Recommended value: 1.33x the index size, not to exceed the available free memory."
Expand Down Expand Up @@ -839,6 +870,13 @@ class CaseConfigInput(BaseModel):
CaseConfigParamInput_NumCandidates_ES,
]

AWSOpensearchLoadingConfig = [CaseConfigParamInput_EFConstruction_AWSOpensearch, CaseConfigParamInput_M_AWSOpensearch]
AWSOpenSearchPerformanceConfig = [
CaseConfigParamInput_EFConstruction_AWSOpensearch,
CaseConfigParamInput_M_AWSOpensearch,
CaseConfigParamInput_EF_SEARCH_AWSOpensearch,
]

PgVectorLoadingConfig = [
CaseConfigParamInput_IndexType_PgVector,
CaseConfigParamInput_Lists_PgVector,
Expand Down Expand Up @@ -920,6 +958,10 @@ class CaseConfigInput(BaseModel):
CaseLabel.Load: ESLoadingConfig,
CaseLabel.Performance: ESPerformanceConfig,
},
DB.AWSOpenSearch: {
CaseLabel.Load: AWSOpensearchLoadingConfig,
CaseLabel.Performance: AWSOpenSearchPerformanceConfig,
},
DB.PgVector: {
CaseLabel.Load: PgVectorLoadingConfig,
CaseLabel.Performance: PgVectorPerformanceConfig,
Expand Down

0 comments on commit fd5ae3c

Please sign in to comment.