From afc8fa89871322b2fdd20552c385fbf3b8bdedb0 Mon Sep 17 00:00:00 2001 From: ShawnXing Date: Fri, 29 Nov 2024 15:49:45 +0800 Subject: [PATCH 1/2] add aliyun elasticsearch --- vectordb_bench/backend/clients/__init__.py | 13 ++ .../aliyun_elasticsearch.py | 162 ++++++++++++++++++ .../clients/aliyun_elasticsearch/config.py | 60 +++++++ .../frontend/config/dbCaseConfigs.py | 39 +++++ 4 files changed, 274 insertions(+) create mode 100644 vectordb_bench/backend/clients/aliyun_elasticsearch/aliyun_elasticsearch.py create mode 100644 vectordb_bench/backend/clients/aliyun_elasticsearch/config.py diff --git a/vectordb_bench/backend/clients/__init__.py b/vectordb_bench/backend/clients/__init__.py index 8859381b..53c1289e 100644 --- a/vectordb_bench/backend/clients/__init__.py +++ b/vectordb_bench/backend/clients/__init__.py @@ -37,6 +37,7 @@ class DB(Enum): MemoryDB = "MemoryDB" Chroma = "Chroma" AWSOpenSearch = "OpenSearch" + AliyunElasticsearch = "AliyunElasticsearch" Test = "test" @@ -103,6 +104,10 @@ def init_cls(self) -> Type[VectorDB]: from .alloydb.alloydb import AlloyDB return AlloyDB + if self == DB.AliyunElasticsearch: + from .aliyun_elasticsearch.aliyun_elasticsearch import AliyunElasticsearch + return AliyunElasticsearch + @property def config_cls(self) -> Type[DBConfig]: """Import while in use""" @@ -166,6 +171,10 @@ def config_cls(self) -> Type[DBConfig]: from .alloydb.config import AlloyDBConfig return AlloyDBConfig + if self == DB.AliyunElasticsearch: + from .aliyun_elasticsearch.config import AliyunElasticsearchConfig + return AliyunElasticsearchConfig + def case_config_cls(self, index_type: IndexType | None = None) -> Type[DBCaseConfig]: if self == DB.Milvus: from .milvus.config import _milvus_case_config @@ -211,6 +220,10 @@ def case_config_cls(self, index_type: IndexType | None = None) -> Type[DBCaseCon from .alloydb.config import _alloydb_case_config return _alloydb_case_config.get(index_type) + if self == DB.AliyunElasticsearch: + from .aliyun_elasticsearch.config import AliyunElasticsearchIndexConfig + return AliyunElasticsearchIndexConfig + # DB.Pinecone, DB.Chroma, DB.Redis return EmptyDBCaseConfig diff --git a/vectordb_bench/backend/clients/aliyun_elasticsearch/aliyun_elasticsearch.py b/vectordb_bench/backend/clients/aliyun_elasticsearch/aliyun_elasticsearch.py new file mode 100644 index 00000000..8c8caa25 --- /dev/null +++ b/vectordb_bench/backend/clients/aliyun_elasticsearch/aliyun_elasticsearch.py @@ -0,0 +1,162 @@ +import logging +import time +from contextlib import contextmanager +from typing import Iterable +from ..api import VectorDB +from .config import AliyunElasticsearchIndexConfig +from elasticsearch.helpers import bulk + + +for logger in ("elasticsearch", "elastic_transport"): + logging.getLogger(logger).setLevel(logging.WARNING) + +log = logging.getLogger(__name__) + +class AliyunElasticsearch(VectorDB): + def __init__( + self, + dim: int, + db_config: dict, + db_case_config: AliyunElasticsearchIndexConfig, + indice: str = "vdb_bench_indice", # must be lowercase + id_col_name: str = "id", + vector_col_name: str = "vector", + drop_old: bool = False, + **kwargs, + ): + self.dim = dim + self.db_config = db_config + self.case_config = db_case_config + self.indice = indice + self.id_col_name = id_col_name + self.vector_col_name = vector_col_name + + from elasticsearch import Elasticsearch + + client = Elasticsearch(**self.db_config) + + if drop_old: + log.info(f"Elasticsearch client drop_old indices: {self.indice}") + is_existed_res = client.indices.exists(index=self.indice) + if is_existed_res.raw: + client.indices.delete(index=self.indice) + self._create_indice(client) + + @contextmanager + def init(self) -> None: + """connect to elasticsearch""" + from elasticsearch import Elasticsearch + self.client = Elasticsearch(**self.db_config, request_timeout=180) + + yield + # self.client.transport.close() + self.client = None + del(self.client) + + def _create_indice(self, client) -> None: + mappings = { + "_source": {"excludes": [self.vector_col_name]}, + "properties": { + self.id_col_name: {"type": "integer", "store": True}, + self.vector_col_name: { + "dims": self.dim, + **self.case_config.index_param(), + }, + } + } + + try: + client.indices.create(index=self.indice, mappings=mappings) + except Exception as e: + log.warning(f"Failed to create indice: {self.indice} error: {str(e)}") + raise e from None + + def insert_embeddings( + self, + embeddings: Iterable[list[float]], + metadata: list[int], + **kwargs, + ) -> (int, Exception): + """Insert the embeddings to the elasticsearch.""" + assert self.client is not None, "should self.init() first" + + insert_data = [ + { + "_index": self.indice, + "_source": { + self.id_col_name: metadata[i], + self.vector_col_name: embeddings[i], + }, + } + for i in range(len(embeddings)) + ] + try: + bulk_insert_res = bulk(self.client, insert_data) + return (bulk_insert_res[0], None) + except Exception as e: + log.warning(f"Failed to insert data: {self.indice} error: {str(e)}") + return (0, e) + + def search_embedding( + self, + query: list[float], + k: int = 100, + filters: dict | None = None, + ) -> list[int]: + """Get k most similar embeddings to query vector. + + Args: + query(list[float]): query embedding to look up documents similar to. + k(int): Number of most similar embeddings to return. Defaults to 100. + filters(dict, optional): filtering expression to filter the data while searching. + + Returns: + list[tuple[int, float]]: list of k most similar embeddings in (id, score) tuple to the query embedding. + """ + assert self.client is not None, "should self.init() first" + # is_existed_res = self.client.indices.exists(index=self.indice) + # assert is_existed_res.raw == True, "should self.init() first" + + knn = { + "field": self.vector_col_name, + "k": k, + "num_candidates": self.case_config.num_candidates, + "filter": [{"range": {self.id_col_name: {"gt": filters["id"]}}}] + if filters + else [], + "query_vector": query, + } + size = k + try: + res = self.client.search( + index=self.indice, + knn=knn, + size=size, + _source=False, + docvalue_fields=[self.id_col_name], + stored_fields="_none_", + filter_path=[f"hits.hits.fields.{self.id_col_name}"], + ) + res = [h["fields"][self.id_col_name][0] for h in res["hits"]["hits"]] + + return res + except Exception as e: + log.warning(f"Failed to search: {self.indice} error: {str(e)}") + raise e from None + + def optimize(self): + """optimize will be called between insertion and search in performance cases.""" + assert self.client is not None, "should self.init() first" + self.client.indices.refresh(index=self.indice) + force_merge_task_id = self.client.indices.forcemerge(index=self.indice, max_num_segments=1, wait_for_completion=False)['task'] + log.info(f"Elasticsearch force merge task id: {force_merge_task_id}") + SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30 + while True: + time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC) + task_status = self.client.tasks.get(task_id=force_merge_task_id) + if task_status['completed']: + return + + def ready_to_load(self): + """ready_to_load will be called before load in load cases.""" + pass diff --git a/vectordb_bench/backend/clients/aliyun_elasticsearch/config.py b/vectordb_bench/backend/clients/aliyun_elasticsearch/config.py new file mode 100644 index 00000000..bbf4acc7 --- /dev/null +++ b/vectordb_bench/backend/clients/aliyun_elasticsearch/config.py @@ -0,0 +1,60 @@ +from enum import Enum +from pydantic import SecretStr, BaseModel + +from ..api import DBConfig, DBCaseConfig, MetricType, IndexType + + +class AliyunElasticsearchConfig(DBConfig, BaseModel): + #: Protocol in use to connect to the node + scheme: str = "http" + host: str = "" + port: int = 9200 + user: str = "elastic" + password: SecretStr + + def to_dict(self) -> dict: + return { + "hosts": [{'scheme': self.scheme, 'host': self.host, 'port': self.port}], + "basic_auth": (self.user, self.password.get_secret_value()), + } + + +class ESElementType(str, Enum): + float = "float" # 4 byte + byte = "byte" # 1 byte, -128 to 127 + + +class AliyunElasticsearchIndexConfig(BaseModel, DBCaseConfig): + element_type: ESElementType = ESElementType.float + index: IndexType = IndexType.ES_HNSW # ES only support 'hnsw' + + metric_type: MetricType | None = None + efConstruction: int | None = None + M: int | None = None + num_candidates: int | None = None + + def parse_metric(self) -> str: + if self.metric_type == MetricType.L2: + return "l2_norm" + elif self.metric_type == MetricType.IP: + return "dot_product" + return "cosine" + + def index_param(self) -> dict: + params = { + "type": "dense_vector", + "index": True, + "element_type": self.element_type.value, + "similarity": self.parse_metric(), + "index_options": { + "type": self.index.value, + "m": self.M, + "ef_construction": self.efConstruction, + }, + } + return params + + def search_param(self) -> dict: + return { + "num_candidates": self.num_candidates, + } diff --git a/vectordb_bench/frontend/config/dbCaseConfigs.py b/vectordb_bench/frontend/config/dbCaseConfigs.py index f8632105..88794e30 100644 --- a/vectordb_bench/frontend/config/dbCaseConfigs.py +++ b/vectordb_bench/frontend/config/dbCaseConfigs.py @@ -1040,6 +1040,35 @@ class CaseConfigInput(BaseModel): }, ) +CaseConfigParamInput_EFConstruction_AliES = CaseConfigInput( + label=CaseConfigParamType.EFConstruction, + inputType=InputType.Number, + inputConfig={ + "min": 8, + "max": 512, + "value": 360, + }, +) + +CaseConfigParamInput_M_AliES = CaseConfigInput( + label=CaseConfigParamType.M, + inputType=InputType.Number, + inputConfig={ + "min": 4, + "max": 64, + "value": 30, + }, +) +CaseConfigParamInput_NumCandidates_AliES = CaseConfigInput( + label=CaseConfigParamType.numCandidates, + inputType=InputType.Number, + inputConfig={ + "min": 1, + "max": 10000, + "value": 100, + }, +) + MilvusLoadConfig = [ CaseConfigParamInput_IndexType, @@ -1206,6 +1235,12 @@ class CaseConfigInput(BaseModel): CaseConfigParamInput_max_parallel_workers_AlloyDB, ] +AliyunElasticsearchLoadingConfig = [CaseConfigParamInput_EFConstruction_AliES, CaseConfigParamInput_M_AliES] +AliyunElasticsearchPerformanceConfig = [ + CaseConfigParamInput_EFConstruction_AliES, + CaseConfigParamInput_M_AliES, + CaseConfigParamInput_NumCandidates_AliES, +] CASE_CONFIG_MAP = { DB.Milvus: { @@ -1247,4 +1282,8 @@ class CaseConfigInput(BaseModel): CaseLabel.Load: AlloyDBLoadConfig, CaseLabel.Performance: AlloyDBPerformanceConfig, }, + DB.AliyunElasticsearch: { + CaseLabel.Load: AliyunElasticsearchLoadingConfig, + CaseLabel.Performance: AliyunElasticsearchPerformanceConfig, + }, } From 33b8b3dbb4dff5ba7bb97a2e4f7ee3619b1bd0fd Mon Sep 17 00:00:00 2001 From: ShawnXing Date: Tue, 10 Dec 2024 16:37:35 +0800 Subject: [PATCH 2/2] code reuse --- vectordb_bench/backend/clients/__init__.py | 4 +- .../aliyun_elasticsearch.py | 163 ++---------------- .../clients/aliyun_elasticsearch/config.py | 41 ----- 3 files changed, 16 insertions(+), 192 deletions(-) diff --git a/vectordb_bench/backend/clients/__init__.py b/vectordb_bench/backend/clients/__init__.py index 53c1289e..ba78c35c 100644 --- a/vectordb_bench/backend/clients/__init__.py +++ b/vectordb_bench/backend/clients/__init__.py @@ -221,8 +221,8 @@ def case_config_cls(self, index_type: IndexType | None = None) -> Type[DBCaseCon return _alloydb_case_config.get(index_type) if self == DB.AliyunElasticsearch: - from .aliyun_elasticsearch.config import AliyunElasticsearchIndexConfig - return AliyunElasticsearchIndexConfig + from .elastic_cloud.config import ElasticCloudIndexConfig + return ElasticCloudIndexConfig # DB.Pinecone, DB.Chroma, DB.Redis return EmptyDBCaseConfig diff --git a/vectordb_bench/backend/clients/aliyun_elasticsearch/aliyun_elasticsearch.py b/vectordb_bench/backend/clients/aliyun_elasticsearch/aliyun_elasticsearch.py index 8c8caa25..41253ca1 100644 --- a/vectordb_bench/backend/clients/aliyun_elasticsearch/aliyun_elasticsearch.py +++ b/vectordb_bench/backend/clients/aliyun_elasticsearch/aliyun_elasticsearch.py @@ -1,162 +1,27 @@ -import logging -import time -from contextlib import contextmanager -from typing import Iterable -from ..api import VectorDB -from .config import AliyunElasticsearchIndexConfig -from elasticsearch.helpers import bulk +from ..elastic_cloud.elastic_cloud import ElasticCloud +from ..elastic_cloud.config import ElasticCloudIndexConfig -for logger in ("elasticsearch", "elastic_transport"): - logging.getLogger(logger).setLevel(logging.WARNING) - -log = logging.getLogger(__name__) - -class AliyunElasticsearch(VectorDB): +class AliyunElasticsearch(ElasticCloud): def __init__( self, dim: int, db_config: dict, - db_case_config: AliyunElasticsearchIndexConfig, + db_case_config: ElasticCloudIndexConfig, indice: str = "vdb_bench_indice", # must be lowercase id_col_name: str = "id", vector_col_name: str = "vector", drop_old: bool = False, **kwargs, ): - self.dim = dim - self.db_config = db_config - self.case_config = db_case_config - self.indice = indice - self.id_col_name = id_col_name - self.vector_col_name = vector_col_name - - from elasticsearch import Elasticsearch - - client = Elasticsearch(**self.db_config) - - if drop_old: - log.info(f"Elasticsearch client drop_old indices: {self.indice}") - is_existed_res = client.indices.exists(index=self.indice) - if is_existed_res.raw: - client.indices.delete(index=self.indice) - self._create_indice(client) - - @contextmanager - def init(self) -> None: - """connect to elasticsearch""" - from elasticsearch import Elasticsearch - self.client = Elasticsearch(**self.db_config, request_timeout=180) - - yield - # self.client.transport.close() - self.client = None - del(self.client) - - def _create_indice(self, client) -> None: - mappings = { - "_source": {"excludes": [self.vector_col_name]}, - "properties": { - self.id_col_name: {"type": "integer", "store": True}, - self.vector_col_name: { - "dims": self.dim, - **self.case_config.index_param(), - }, - } - } - - try: - client.indices.create(index=self.indice, mappings=mappings) - except Exception as e: - log.warning(f"Failed to create indice: {self.indice} error: {str(e)}") - raise e from None - - def insert_embeddings( - self, - embeddings: Iterable[list[float]], - metadata: list[int], - **kwargs, - ) -> (int, Exception): - """Insert the embeddings to the elasticsearch.""" - assert self.client is not None, "should self.init() first" - - insert_data = [ - { - "_index": self.indice, - "_source": { - self.id_col_name: metadata[i], - self.vector_col_name: embeddings[i], - }, - } - for i in range(len(embeddings)) - ] - try: - bulk_insert_res = bulk(self.client, insert_data) - return (bulk_insert_res[0], None) - except Exception as e: - log.warning(f"Failed to insert data: {self.indice} error: {str(e)}") - return (0, e) - - def search_embedding( - self, - query: list[float], - k: int = 100, - filters: dict | None = None, - ) -> list[int]: - """Get k most similar embeddings to query vector. - - Args: - query(list[float]): query embedding to look up documents similar to. - k(int): Number of most similar embeddings to return. Defaults to 100. - filters(dict, optional): filtering expression to filter the data while searching. - - Returns: - list[tuple[int, float]]: list of k most similar embeddings in (id, score) tuple to the query embedding. - """ - assert self.client is not None, "should self.init() first" - # is_existed_res = self.client.indices.exists(index=self.indice) - # assert is_existed_res.raw == True, "should self.init() first" - - knn = { - "field": self.vector_col_name, - "k": k, - "num_candidates": self.case_config.num_candidates, - "filter": [{"range": {self.id_col_name: {"gt": filters["id"]}}}] - if filters - else [], - "query_vector": query, - } - size = k - try: - res = self.client.search( - index=self.indice, - knn=knn, - size=size, - _source=False, - docvalue_fields=[self.id_col_name], - stored_fields="_none_", - filter_path=[f"hits.hits.fields.{self.id_col_name}"], - ) - res = [h["fields"][self.id_col_name][0] for h in res["hits"]["hits"]] - - return res - except Exception as e: - log.warning(f"Failed to search: {self.indice} error: {str(e)}") - raise e from None - - def optimize(self): - """optimize will be called between insertion and search in performance cases.""" - assert self.client is not None, "should self.init() first" - self.client.indices.refresh(index=self.indice) - force_merge_task_id = self.client.indices.forcemerge(index=self.indice, max_num_segments=1, wait_for_completion=False)['task'] - log.info(f"Elasticsearch force merge task id: {force_merge_task_id}") - SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30 - while True: - time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC) - task_status = self.client.tasks.get(task_id=force_merge_task_id) - if task_status['completed']: - return + super().__init__( + dim=dim, + db_config=db_config, + db_case_config=db_case_config, + indice=indice, + id_col_name=id_col_name, + vector_col_name=vector_col_name, + drop_old=drop_old, + **kwargs, + ) - def ready_to_load(self): - """ready_to_load will be called before load in load cases.""" - pass diff --git a/vectordb_bench/backend/clients/aliyun_elasticsearch/config.py b/vectordb_bench/backend/clients/aliyun_elasticsearch/config.py index bbf4acc7..a2de4dc7 100644 --- a/vectordb_bench/backend/clients/aliyun_elasticsearch/config.py +++ b/vectordb_bench/backend/clients/aliyun_elasticsearch/config.py @@ -17,44 +17,3 @@ def to_dict(self) -> dict: "hosts": [{'scheme': self.scheme, 'host': self.host, 'port': self.port}], "basic_auth": (self.user, self.password.get_secret_value()), } - - -class ESElementType(str, Enum): - float = "float" # 4 byte - byte = "byte" # 1 byte, -128 to 127 - - -class AliyunElasticsearchIndexConfig(BaseModel, DBCaseConfig): - element_type: ESElementType = ESElementType.float - index: IndexType = IndexType.ES_HNSW # ES only support 'hnsw' - - metric_type: MetricType | None = None - efConstruction: int | None = None - M: int | None = None - num_candidates: int | None = None - - def parse_metric(self) -> str: - if self.metric_type == MetricType.L2: - return "l2_norm" - elif self.metric_type == MetricType.IP: - return "dot_product" - return "cosine" - - def index_param(self) -> dict: - params = { - "type": "dense_vector", - "index": True, - "element_type": self.element_type.value, - "similarity": self.parse_metric(), - "index_options": { - "type": self.index.value, - "m": self.M, - "ef_construction": self.efConstruction, - }, - } - return params - - def search_param(self) -> dict: - return { - "num_candidates": self.num_candidates, - }