Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added optimization for Opensearch and also included the options to tweak the hyper parameters #378

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
from typing import Iterable, Type
from ..api import VectorDB, DBCaseConfig, DBConfig, IndexType
from .config import AWSOpenSearchConfig, AWSOpenSearchIndexConfig
from .config import AWSOpenSearchConfig, AWSOpenSearchIndexConfig, AWSOS_Engine
from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk

Expand Down Expand Up @@ -83,7 +83,7 @@ def _create_index(self, client: OpenSearch):

@contextmanager
def init(self) -> None:
"""connect to elasticsearch"""
"""connect to opensearch"""
self.client = OpenSearch(**self.db_config)

yield
Expand All @@ -97,7 +97,7 @@ def insert_embeddings(
metadata: list[int],
**kwargs,
) -> tuple[int, Exception]:
"""Insert the embeddings to the elasticsearch."""
"""Insert the embeddings to the opensearch."""
assert self.client is not None, "should self.init() first"

insert_data = []
Expand Down Expand Up @@ -136,13 +136,15 @@ def search_embedding(
body = {
"size": k,
"query": {"knn": {self.vector_col_name: {"vector": query, "k": k}}},
**({"filter": {"range": {self.id_col_name: {"gt": filters["id"]}}}} if filters else {})
}
try:
resp = self.client.search(index=self.index_name, body=body)
resp = self.client.search(index=self.index_name, body=body,size=k,_source=False,docvalue_fields=[self.id_col_name],stored_fields="_none_",filter_path=[f"hits.hits.fields.{self.id_col_name}"],)
log.info(f'Search took: {resp["took"]}')
log.info(f'Search shards: {resp["_shards"]}')
log.info(f'Search hits total: {resp["hits"]["total"]}')
result = [int(d["_id"]) for d in resp["hits"]["hits"]]
result = [h["fields"][self.id_col_name][0] for h in resp["hits"]["hits"]]
#result = [int(d["_id"]) for d in resp["hits"]["hits"]]
# log.info(f'success! length={len(res)}')

return result
Expand All @@ -152,7 +154,46 @@ def search_embedding(

def optimize(self):
"""optimize will be called between insertion and search in performance cases."""
pass
# Call refresh first to ensure that all segments are created
self._refresh_index()
self._do_force_merge()
# Call refresh again to ensure that the index is ready after force merge.
self._refresh_index()
# ensure that all graphs are loaded in memory and ready for search
self._load_graphs_to_memory()

def _refresh_index(self):
log.debug(f"Starting refresh for index {self.index_name}")
SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC = 30
while True:
try:
log.info(f"Starting the Refresh Index..")
self.client.indices.refresh(index=self.index_name)
break
except Exception as e:
log.info(
f"Refresh errored out. Sleeping for {SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC} sec and then Retrying : {e}")
time.sleep(SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC)
continue
log.debug(f"Completed refresh for index {self.index_name}")

def _do_force_merge(self):
log.debug(f"Starting force merge for index {self.index_name}")
force_merge_endpoint = f'/{self.index_name}/_forcemerge?max_num_segments=1&wait_for_completion=false'
force_merge_task_id = self.client.transport.perform_request('POST', force_merge_endpoint)['task']
SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30
while True:
time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC)
task_status = self.client.tasks.get(task_id=force_merge_task_id)
if task_status['completed']:
break
log.debug(f"Completed force merge for index {self.index_name}")

def _load_graphs_to_memory(self):
if self.case_config.engine != AWSOS_Engine.lucene:
log.info("Calling warmup API to load graphs into memory")
warmup_endpoint = f'/_plugins/_knn/warmup/{self.index_name}'
self.client.transport.perform_request('GET', warmup_endpoint)

def ready_to_load(self):
"""ready_to_load will be called before load in load cases."""
Expand Down
18 changes: 12 additions & 6 deletions vectordb_bench/backend/clients/aws_opensearch/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
from enum import Enum
from pydantic import SecretStr, BaseModel

from ..api import DBConfig, DBCaseConfig, MetricType, IndexType


log = logging.getLogger(__name__)
class AWSOpenSearchConfig(DBConfig, BaseModel):
host: str = ""
port: int = 443
Expand Down Expand Up @@ -31,14 +32,18 @@ class AWSOS_Engine(Enum):

class AWSOpenSearchIndexConfig(BaseModel, DBCaseConfig):
metric_type: MetricType = MetricType.L2
engine: AWSOS_Engine = AWSOS_Engine.nmslib
efConstruction: int = 360
M: int = 30
engine: AWSOS_Engine = AWSOS_Engine.faiss
efConstruction: int = 256
efSearch: int = 256
M: int = 16

def parse_metric(self) -> str:
if self.metric_type == MetricType.IP:
return "innerproduct" # only support faiss / nmslib, not for Lucene.
return "innerproduct"
elif self.metric_type == MetricType.COSINE:
if self.engine == AWSOS_Engine.faiss:
log.info(f"Using metric type as innerproduct because faiss doesn't support cosine as metric type for Opensearch")
return "innerproduct"
return "cosinesimil"
return "l2"

Expand All @@ -49,7 +54,8 @@ def index_param(self) -> dict:
"engine": self.engine.value,
"parameters": {
"ef_construction": self.efConstruction,
"m": self.M
"m": self.M,
"ef_search": self.efSearch
}
}
return params
Expand Down
37 changes: 34 additions & 3 deletions vectordb_bench/backend/clients/aws_opensearch/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ def create_index(client, index_name):
"type": "knn_vector",
"dimension": _DIM,
"method": {
"engine": "nmslib",
"engine": "faiss",
"name": "hnsw",
"space_type": "l2",
"parameters": {
"ef_construction": 128,
"m": 24,
"ef_construction": 256,
"m": 16,
}
}
}
Expand Down Expand Up @@ -108,12 +108,43 @@ def search(client, index_name):
print('\nSearch not ready, sleep 1s')
time.sleep(1)

def optimize_index(client, index_name):
print(f"Starting force merge for index {index_name}")
force_merge_endpoint = f'/{index_name}/_forcemerge?max_num_segments=1&wait_for_completion=false'
force_merge_task_id = client.transport.perform_request('POST', force_merge_endpoint)['task']
SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30
while True:
time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC)
task_status = client.tasks.get(task_id=force_merge_task_id)
if task_status['completed']:
break
print(f"Completed force merge for index {index_name}")


def refresh_index(client, index_name):
print(f"Starting refresh for index {index_name}")
SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC = 30
while True:
try:
print(f"Starting the Refresh Index..")
client.indices.refresh(index=index_name)
break
except Exception as e:
print(
f"Refresh errored out. Sleeping for {SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC} sec and then Retrying : {e}")
time.sleep(SECONDS_WAITING_FOR_REFRESH_API_CALL_SEC)
continue
print(f"Completed refresh for index {index_name}")



def main():
client = create_client()
try:
create_index(client, _INDEX_NAME)
bulk_insert(client, _INDEX_NAME)
optimize_index(client, _INDEX_NAME)
refresh_index(client, _INDEX_NAME)
search(client, _INDEX_NAME)
delete_index(client, _INDEX_NAME)
except Exception as e:
Expand Down
42 changes: 42 additions & 0 deletions vectordb_bench/frontend/config/dbCaseConfigs.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,37 @@ class CaseConfigInput(BaseModel):
},
)

CaseConfigParamInput_EFConstruction_AWSOpensearch = CaseConfigInput(
label=CaseConfigParamType.EFConstruction,
inputType=InputType.Number,
inputConfig={
"min": 100,
"max": 1024,
"value": 256,
},
)

CaseConfigParamInput_M_AWSOpensearch = CaseConfigInput(
label=CaseConfigParamType.M,
inputType=InputType.Number,
inputConfig={
"min": 4,
"max": 64,
"value": 16,
},
)

CaseConfigParamInput_EF_SEARCH_AWSOpensearch = CaseConfigInput(
label=CaseConfigParamType.ef_search,
inputType=InputType.Number,
inputConfig={
"min": 100,
"max": 1024,
"value": 256,
},
)


CaseConfigParamInput_maintenance_work_mem_PgVector = CaseConfigInput(
label=CaseConfigParamType.maintenance_work_mem,
inputHelp="Recommended value: 1.33x the index size, not to exceed the available free memory."
Expand Down Expand Up @@ -839,6 +870,13 @@ class CaseConfigInput(BaseModel):
CaseConfigParamInput_NumCandidates_ES,
]

AWSOpensearchLoadingConfig = [CaseConfigParamInput_EFConstruction_AWSOpensearch, CaseConfigParamInput_M_AWSOpensearch]
AWSOpenSearchPerformanceConfig = [
CaseConfigParamInput_EFConstruction_AWSOpensearch,
CaseConfigParamInput_M_AWSOpensearch,
CaseConfigParamInput_EF_SEARCH_AWSOpensearch,
]

PgVectorLoadingConfig = [
CaseConfigParamInput_IndexType_PgVector,
CaseConfigParamInput_Lists_PgVector,
Expand Down Expand Up @@ -920,6 +958,10 @@ class CaseConfigInput(BaseModel):
CaseLabel.Load: ESLoadingConfig,
CaseLabel.Performance: ESPerformanceConfig,
},
DB.AWSOpenSearch: {
CaseLabel.Load: AWSOpensearchLoadingConfig,
CaseLabel.Performance: AWSOpenSearchPerformanceConfig,
},
DB.PgVector: {
CaseLabel.Load: PgVectorLoadingConfig,
CaseLabel.Performance: PgVectorPerformanceConfig,
Expand Down