diff --git a/vectordb_bench/backend/clients/milvus/milvus.py b/vectordb_bench/backend/clients/milvus/milvus.py index 4e8a35275..58334efe9 100644 --- a/vectordb_bench/backend/clients/milvus/milvus.py +++ b/vectordb_bench/backend/clients/milvus/milvus.py @@ -89,6 +89,7 @@ def init(self) -> None: connections.disconnect("default") def _optimize(self): + self._post_insert() log.info(f"{self.name} optimizing before search") try: self.col.load() @@ -116,7 +117,7 @@ def wait_index(): time.sleep(5) wait_index() - + # Skip compaction if use GPU indexType if self.case_config.index in [IndexType.GPU_CAGRA, IndexType.GPU_IVF_FLAT, IndexType.GPU_IVF_PQ]: log.debug("skip compaction for gpu index type.") @@ -179,8 +180,6 @@ def insert_embeddings( ] res = self.col.insert(insert_data) insert_count += len(res.primary_keys) - if kwargs.get("last_batch"): - self._post_insert() except MilvusException as e: log.info(f"Failed to insert data: {e}") return (insert_count, e) diff --git a/vectordb_bench/backend/clients/pgvector/pgvector.py b/vectordb_bench/backend/clients/pgvector/pgvector.py index a69b7cbde..afe4218c4 100644 --- a/vectordb_bench/backend/clients/pgvector/pgvector.py +++ b/vectordb_bench/backend/clients/pgvector/pgvector.py @@ -10,7 +10,7 @@ from ..api import IndexType, VectorDB, DBCaseConfig -log = logging.getLogger(__name__) +log = logging.getLogger(__name__) class PgVector(VectorDB): """ Use SQLAlchemy instructions""" @@ -36,12 +36,12 @@ def __init__( # construct basic units self.conn = psycopg2.connect(**self.db_config) self.conn.autocommit = False - self.cursor = self.conn.cursor() - + self.cursor = self.conn.cursor() + # create vector extension self.cursor.execute('CREATE EXTENSION IF NOT EXISTS vector') self.conn.commit() - + if drop_old : log.info(f"Pgvector client drop table : {self.table_name}") # self.pg_table.drop(pg_engine, checkfirst=True) @@ -49,7 +49,7 @@ def __init__( self._drop_table() self._create_table(dim) self._create_index() - + self.cursor.close() self.conn.close() self.cursor = None @@ -66,7 +66,7 @@ def init(self) -> None: self.conn = psycopg2.connect(**self.db_config) self.conn.autocommit = False self.cursor = self.conn.cursor() - + try: yield finally: @@ -74,39 +74,36 @@ def init(self) -> None: self.conn.close() self.cursor = None self.conn = None - + def _drop_table(self): assert self.conn is not None, "Connection is not initialized" assert self.cursor is not None, "Cursor is not initialized" - + self.cursor.execute(f'DROP TABLE IF EXISTS public."{self.table_name}"') self.conn.commit() - + def ready_to_load(self): pass def optimize(self): - pass - - def _post_insert(self): - log.info(f"{self.name} post insert before optimize") + log.info(f"{self.name} optimizing") self._drop_index() self._create_index() def ready_to_search(self): pass - + def _drop_index(self): assert self.conn is not None, "Connection is not initialized" assert self.cursor is not None, "Cursor is not initialized" - + self.cursor.execute(f'DROP INDEX IF EXISTS "{self._index_name}"') self.conn.commit() - + def _create_index(self): assert self.conn is not None, "Connection is not initialized" assert self.cursor is not None, "Cursor is not initialized" - + index_param = self.case_config.index_param() if self.case_config.index == IndexType.HNSW: log.debug(f'Creating HNSW index. m={index_param["m"]}, ef_construction={index_param["ef_construction"]}') @@ -117,11 +114,11 @@ def _create_index(self): else: assert "Invalid index type {self.case_config.index}" self.conn.commit() - + def _create_table(self, dim : int): assert self.conn is not None, "Connection is not initialized" assert self.cursor is not None, "Cursor is not initialized" - + try: # create table self.cursor.execute(f'CREATE TABLE IF NOT EXISTS public."{self.table_name}" (id BIGINT PRIMARY KEY, embedding vector({dim}));') @@ -151,16 +148,13 @@ def insert_embeddings( csv_buffer.seek(0) self.cursor.copy_expert(f"COPY public.\"{self.table_name}\" FROM STDIN WITH (FORMAT CSV)", csv_buffer) self.conn.commit() - - if kwargs.get("last_batch"): - self._post_insert() - + return len(metadata), None except Exception as e: - log.warning(f"Failed to insert data into pgvector table ({self.table_name}), error: {e}") + log.warning(f"Failed to insert data into pgvector table ({self.table_name}), error: {e}") return 0, e - def search_embedding( + def search_embedding( self, query: list[float], k: int = 100, @@ -184,4 +178,3 @@ def search_embedding( result = self.cursor.fetchall() return [int(i[0]) for i in result] - diff --git a/vectordb_bench/backend/runner/serial_runner.py b/vectordb_bench/backend/runner/serial_runner.py index aeed0ec74..e4861abd1 100644 --- a/vectordb_bench/backend/runner/serial_runner.py +++ b/vectordb_bench/backend/runner/serial_runner.py @@ -46,11 +46,9 @@ def task(self) -> int: del(emb_np) log.debug(f"batch dataset size: {len(all_embeddings)}, {len(all_metadata)}") - last_batch = self.dataset.data.size - count == len(all_metadata) insert_count, error = self.db.insert_embeddings( embeddings=all_embeddings, metadata=all_metadata, - last_batch=last_batch, ) if error is not None: raise error diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py index a5f4e317f..c5c368d02 100644 --- a/vectordb_bench/backend/task_runner.py +++ b/vectordb_bench/backend/task_runner.py @@ -140,8 +140,8 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric: ) self._init_search_runner() - m.recall, m.serial_latency_p99 = self._serial_search() m.qps = self._conc_search() + m.recall, m.serial_latency_p99 = self._serial_search() except Exception as e: log.warning(f"Failed to run performance case, reason = {e}") traceback.print_exc()