From 91a019ff3a1942ffdf0458e43c2c89c0092721c6 Mon Sep 17 00:00:00 2001 From: yangxuan Date: Sat, 14 Sep 2024 15:48:49 +0800 Subject: [PATCH] Add rate runner Signed-off-by: yangxuan --- tests/test_rate_runner.py | 88 ++++++++++++++ vectordb_bench/__init__.py | 2 +- .../backend/clients/milvus/milvus.py | 5 +- vectordb_bench/backend/dataset.py | 13 +- vectordb_bench/backend/runner/mp_runner.py | 89 +++++++++++++- vectordb_bench/backend/runner/rate_runner.py | 79 ++++++++++++ .../backend/runner/read_write_runner.py | 112 ++++++++++++++++++ vectordb_bench/backend/runner/util.py | 32 +++++ vectordb_bench/backend/task_runner.py | 4 +- 9 files changed, 413 insertions(+), 11 deletions(-) create mode 100644 tests/test_rate_runner.py create mode 100644 vectordb_bench/backend/runner/rate_runner.py create mode 100644 vectordb_bench/backend/runner/read_write_runner.py create mode 100644 vectordb_bench/backend/runner/util.py diff --git a/tests/test_rate_runner.py b/tests/test_rate_runner.py new file mode 100644 index 000000000..363b7a33c --- /dev/null +++ b/tests/test_rate_runner.py @@ -0,0 +1,88 @@ +from typing import Iterable +import argparse +from vectordb_bench.backend.dataset import Dataset, DatasetSource +from vectordb_bench.backend.runner.rate_runner import RatedMultiThreadingInsertRunner +from vectordb_bench.backend.runner.read_write_runner import ReadWriteRunner +from vectordb_bench.backend.clients import DB, VectorDB +from vectordb_bench.backend.clients.milvus.config import FLATConfig +from vectordb_bench.backend.clients.zilliz_cloud.config import AutoIndexConfig + +import logging + +log = logging.getLogger("vectordb_bench") +log.setLevel(logging.DEBUG) + +def get_rate_runner(db): + cohere = Dataset.COHERE.manager(100_000) + prepared = cohere.prepare(DatasetSource.AliyunOSS) + assert prepared + runner = RatedMultiThreadingInsertRunner( + rate = 10, + db = db, + dataset = cohere, + ) + + return runner + +def test_rate_runner(db, insert_rate): + runner = get_rate_runner(db) + + _, t = runner.run_with_rate() + log.info(f"insert run done, time={t}") + +def test_read_write_runner(db, insert_rate, conc: list, search_stage: Iterable[float], read_dur_after_write: int, local: bool=False): + cohere = Dataset.COHERE.manager(1_000_000) + if local is True: + source = DatasetSource.AliyunOSS + else: + source = DatasetSource.S3 + prepared = cohere.prepare(source) + assert prepared + + rw_runner = ReadWriteRunner( + db=db, + dataset=cohere, + insert_rate=insert_rate, + search_stage=search_stage, + read_dur_after_write=read_dur_after_write, + concurrencies=conc + ) + rw_runner.run_read_write() + + +def get_db(db: str, config: dict) -> VectorDB: + if db == DB.Milvus.name: + return DB.Milvus.init_cls(dim=768, db_config=config, db_case_config=FLATConfig(metric_type="COSINE"), drop_old=True, pre_load=True) + elif db == DB.ZillizCloud.name: + return DB.ZillizCloud.init_cls(dim=768, db_config=config, db_case_config=AutoIndexConfig(metric_type="COSINE"), drop_old=True, pre_load=True) + else: + raise ValueError(f"unknown db: {db}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-r", "--insert_rate", type=int, default="1000", help="insert entity row count per seconds, cps") + parser.add_argument("-d", "--db", type=str, default=DB.Milvus.name, help="db name") + parser.add_argument("-t", "--duration", type=int, default=300, help="stage search duration in seconds") + parser.add_argument("--use_s3", action='store_true', help="whether to use S3 dataset") + + flags = parser.parse_args() + + # TODO read uri, user, password from .env + config = { + "uri": "http://localhost:19530", + "user": "", + "password": "", + } + + conc = (1, 15, 50) + search_stage = (0.5, 0.6, 0.7, 0.8, 0.9, 1.0) + + db = get_db(flags.db, config) + test_read_write_runner( + db=db, + insert_rate=flags.insert_rate, + conc=conc, + search_stage=search_stage, + read_dur_after_write=flags.duration, + local=flags.use_s3) diff --git a/vectordb_bench/__init__.py b/vectordb_bench/__init__.py index f81c9bb3f..568e97705 100644 --- a/vectordb_bench/__init__.py +++ b/vectordb_bench/__init__.py @@ -17,7 +17,7 @@ class config: DEFAULT_DATASET_URL = env.str("DEFAULT_DATASET_URL", AWS_S3_URL) DATASET_LOCAL_DIR = env.path("DATASET_LOCAL_DIR", "/tmp/vectordb_bench/dataset") - NUM_PER_BATCH = env.int("NUM_PER_BATCH", 5000) + NUM_PER_BATCH = env.int("NUM_PER_BATCH", 100) DROP_OLD = env.bool("DROP_OLD", True) USE_SHUFFLED_DATA = env.bool("USE_SHUFFLED_DATA", True) diff --git a/vectordb_bench/backend/clients/milvus/milvus.py b/vectordb_bench/backend/clients/milvus/milvus.py index 4590265ae..8f8571aa3 100644 --- a/vectordb_bench/backend/clients/milvus/milvus.py +++ b/vectordb_bench/backend/clients/milvus/milvus.py @@ -66,7 +66,8 @@ def __init__( self.case_config.index_param(), index_name=self._index_name, ) - # self._pre_load(coll) + if kwargs.get("pre_load") is True: + self._pre_load(col) connections.disconnect("default") @@ -92,7 +93,7 @@ def _optimize(self): self._post_insert() log.info(f"{self.name} optimizing before search") try: - self.col.load() + self.col.load(refresh=True) except Exception as e: log.warning(f"{self.name} optimize error: {e}") raise e from None diff --git a/vectordb_bench/backend/dataset.py b/vectordb_bench/backend/dataset.py index d559eb6be..d62d96684 100644 --- a/vectordb_bench/backend/dataset.py +++ b/vectordb_bench/backend/dataset.py @@ -57,11 +57,11 @@ class CustomDataset(BaseDataset): dir: str file_num: int isCustom: bool = True - + @validator("size") def verify_size(cls, v): return v - + @property def label(self) -> str: return "Custom" @@ -73,7 +73,8 @@ def dir_name(self) -> str: @property def file_count(self) -> int: return self.file_num - + + class LAION(BaseDataset): name: str = "LAION" dim: int = 768 @@ -242,13 +243,15 @@ def __init__(self, dataset: DatasetManager): self._cur = None self._sub_idx = [0 for i in range(len(self._ds.train_files))] # iter num for each file + def __iter__(self): + return self + def _get_iter(self, file_name: str): p = pathlib.Path(self._ds.data_dir, file_name) log.info(f"Get iterator for {p.name}") if not p.exists(): raise IndexError(f"No such file {p}") - log.warning(f"No such file: {p}") - return ParquetFile(p).iter_batches(config.NUM_PER_BATCH) + return ParquetFile(p, memory_map=True, pre_buffer=True).iter_batches(config.NUM_PER_BATCH) def __next__(self) -> pd.DataFrame: """return the data in the next file of the training list""" diff --git a/vectordb_bench/backend/runner/mp_runner.py b/vectordb_bench/backend/runner/mp_runner.py index 6a9f7c979..596865b42 100644 --- a/vectordb_bench/backend/runner/mp_runner.py +++ b/vectordb_bench/backend/runner/mp_runner.py @@ -64,7 +64,7 @@ def search(self, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) log.warning(f"VectorDB search_embedding error: {e}") traceback.print_exc(chain=True) raise e from None - + latencies.append(time.perf_counter() - s) count += 1 # loop through the test data @@ -87,6 +87,8 @@ def get_mp_context(): log.debug(f"MultiProcessingSearchRunner get multiprocessing start method: {mp_start_method}") return mp.get_context(mp_start_method) + + def _run_all_concurrencies_mem_efficient(self) -> float: max_qps = 0 conc_num_list = [] @@ -145,3 +147,88 @@ def run(self) -> float: def stop(self) -> None: pass + + def run_by_dur(self, duration: int) -> float: + return self._run_by_dur(duration) + + def _run_by_dur(self, duration: int) -> float: + max_qps = 0 + try: + for conc in self.concurrencies: + with mp.Manager() as m: + q, cond = m.Queue(), m.Condition() + with concurrent.futures.ProcessPoolExecutor(mp_context=self.get_mp_context(), max_workers=conc) as executor: + log.info(f"Start search_by_dur {duration}s in concurrency {conc}, filters: {self.filters}") + future_iter = [executor.submit(self.search_by_dur, duration, self.test_data, q, cond) for i in range(conc)] + # Sync all processes + while q.qsize() < conc: + sleep_t = conc if conc < 10 else 10 + time.sleep(sleep_t) + + with cond: + cond.notify_all() + log.info(f"Syncing all process and start concurrency search, concurrency={conc}") + + start = time.perf_counter() + all_count = sum([r.result() for r in future_iter]) + cost = time.perf_counter() - start + + qps = round(all_count / cost, 4) + log.info(f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}") + + if qps > max_qps: + max_qps = qps + log.info(f"Update largest qps with concurrency {conc}: current max_qps={max_qps}") + except Exception as e: + log.warning(f"Fail to search all concurrencies: {self.concurrencies}, max_qps before failure={max_qps}, reason={e}") + traceback.print_exc() + + # No results available, raise exception + if max_qps == 0.0: + raise e from None + + finally: + self.stop() + + return max_qps + + + def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) -> int: + # sync all process + q.put(1) + with cond: + cond.wait() + + with self.db.init(): + num, idx = len(test_data), random.randint(0, len(test_data) - 1) + + start_time = time.perf_counter() + count = 0 + while time.perf_counter() < start_time + dur: + s = time.perf_counter() + try: + self.db.search_embedding( + test_data[idx], + self.k, + self.filters, + ) + except Exception as e: + log.warning(f"VectorDB search_embedding error: {e}") + traceback.print_exc(chain=True) + raise e from None + + count += 1 + # loop through the test data + idx = idx + 1 if idx < num - 1 else 0 + + if count % 500 == 0: + log.debug(f"({mp.current_process().name:16}) search_count: {count}, latest_latency={time.perf_counter()-s}") + + total_dur = round(time.perf_counter() - start_time, 4) + log.debug( + f"{mp.current_process().name:16} search {self.duration}s: " + f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}" + ) + + return count + diff --git a/vectordb_bench/backend/runner/rate_runner.py b/vectordb_bench/backend/runner/rate_runner.py new file mode 100644 index 000000000..4b6d7f6cf --- /dev/null +++ b/vectordb_bench/backend/runner/rate_runner.py @@ -0,0 +1,79 @@ +import logging +import time +from concurrent.futures import ThreadPoolExecutor +import multiprocessing as mp + + +from vectordb_bench.backend.clients import api +from vectordb_bench.backend.dataset import DataSetIterator +from vectordb_bench.backend.utils import time_it +from vectordb_bench import config + +from .util import get_data, is_futures_completed, get_future_exceptions +log = logging.getLogger(__name__) + + +class RatedMultiThreadingInsertRunner: + def __init__( + self, + rate: int, # numRows per second + db: api.VectorDB, + dataset_iter: DataSetIterator, + normalize: bool = False, + timeout: float | None = None, + ): + self.timeout = timeout if isinstance(timeout, (int, float)) else None + self.dataset = dataset_iter + self.db = db + self.normalize = normalize + self.insert_rate = rate + self.batch_rate = rate // config.NUM_PER_BATCH + + def send_insert_task(self, db, emb: list[list[float]], metadata: list[str]): + db.insert_embeddings(emb, metadata) + + @time_it + def run_with_rate(self, q: mp.Queue): + with ThreadPoolExecutor(max_workers=mp.cpu_count()) as executor: + executing_futures = [] + + @time_it + def submit_by_rate() -> bool: + rate = self.batch_rate + for data in self.dataset: + emb, metadata = get_data(data, self.normalize) + executing_futures.append(executor.submit(self.send_insert_task, self.db, emb, metadata)) + rate -= 1 + + if rate == 0: + return False + return rate == self.batch_rate + + with self.db.init(): + while True: + start_time = time.perf_counter() + finished, elapsed_time = submit_by_rate() + if finished is True: + q.put(None, block=True) + log.info(f"End of dataset, left unfinished={len(executing_futures)}") + return + + q.put(True, block=False) + wait_interval = 1 - elapsed_time if elapsed_time < 1 else 0.001 + + e, completed = is_futures_completed(executing_futures, wait_interval) + if completed is True: + ex = get_future_exceptions(executing_futures) + if ex is not None: + log.warn(f"task error, terminating, err={ex}") + q.put(None) + executor.shutdown(wait=True, cancel_futures=True) + raise ex + else: + log.debug(f"Finished {len(executing_futures)} insert-{config.NUM_PER_BATCH} task in 1s, wait_interval={wait_interval:.2f}") + executing_futures = [] + else: + log.warning(f"Failed to finish tasks in 1s, {e}, waited={wait_interval:.2f}, try to check the next round") + dur = time.perf_counter() - start_time + if dur < 1: + time.sleep(1 - dur) diff --git a/vectordb_bench/backend/runner/read_write_runner.py b/vectordb_bench/backend/runner/read_write_runner.py new file mode 100644 index 000000000..6e043dceb --- /dev/null +++ b/vectordb_bench/backend/runner/read_write_runner.py @@ -0,0 +1,112 @@ +import logging +from typing import Iterable +import multiprocessing as mp +import concurrent +import numpy as np +import math + +from .mp_runner import MultiProcessingSearchRunner +from .serial_runner import SerialSearchRunner +from .rate_runner import RatedMultiThreadingInsertRunner +from vectordb_bench.backend.clients import api +from vectordb_bench.backend.dataset import DatasetManager + +log = logging.getLogger(__name__) + + +class ReadWriteRunner(MultiProcessingSearchRunner, RatedMultiThreadingInsertRunner): + def __init__( + self, + db: api.VectorDB, + dataset: DatasetManager, + insert_rate: int = 1000, + normalize: bool = False, + k: int = 100, + filters: dict | None = None, + concurrencies: Iterable[int] = (1, 15, 50), + search_stage: Iterable[float] = (0.5, 0.6, 0.7, 0.8, 0.9, 1.0), # search in any insert portion, 0.0 means search from the start + read_dur_after_write: int = 300, # seconds, search duration when insertion is done + timeout: float | None = None, + ): + self.insert_rate = insert_rate + self.data_volume = dataset.data.size + + for stage in search_stage: + assert 0.0 <= stage <= 1.0, "each search stage should be in [0.0, 1.0]" + self.search_stage = sorted(search_stage) + self.read_dur_after_write = read_dur_after_write + + log.info(f"Init runner, concurencys={concurrencies}, search_stage={search_stage}, stage_search_dur={read_dur_after_write}") + + test_emb = np.stack(dataset.test_data["emb"]) + if normalize: + test_emb = test_emb / np.linalg.norm(test_emb, axis=1)[:, np.newaxis] + test_emb = test_emb.tolist() + + MultiProcessingSearchRunner.__init__( + self, + db=db, + test_data=test_emb, + k=k, + filters=filters, + concurrencies=concurrencies, + ) + RatedMultiThreadingInsertRunner.__init__( + self, + rate=insert_rate, + db=db, + dataset_iter=iter(dataset), + normalize=normalize, + ) + self.serial_search_runner = SerialSearchRunner( + db=db, + test_data=test_emb, + ground_truth=dataset.gt_data, + k=k, + ) + + def run_read_write(self): + futures = [] + with mp.Manager() as m: + q = m.Queue() + with concurrent.futures.ProcessPoolExecutor(mp_context=mp.get_context("spawn"), max_workers=2) as executor: + futures.append(executor.submit(self.run_with_rate, q)) + futures.append(executor.submit(self.run_search_by_sig, q)) + + for future in concurrent.futures.as_completed(futures): + res = future.result() + log.info(f"Result = {res}") + + log.info("Concurrent read write all done") + + + def run_search_by_sig(self, q): + res = [] + total_batch = math.ceil(self.data_volume / self.insert_rate) + batch = 0 + recall = 'x' + + for idx, stage in enumerate(self.search_stage): + target_batch = int(total_batch * stage) + while q.get(block=True): + batch += 1 + if batch >= target_batch: + perc = int(stage * 100) + log.info(f"Insert {perc}% done, total batch={total_batch}") + log.info(f"[{batch}/{total_batch}] Serial search - {perc}% start") + recall, ndcg, p99 =self.serial_search_runner.run() + + if idx < len(self.search_stage) - 1: + stage_search_dur = (self.data_volume * (self.search_stage[idx + 1] - stage) // self.insert_rate) // len(self.concurrencies) + if stage_search_dur < 30: + log.warning(f"Search duration too short, please reduce concurrency count or insert rate, or increase dataset volume: dur={stage_search_dur}, concurrencies={len(self.concurrencies)}, insert_rate={self.insert_rate}") + log.info(f"[{batch}/{total_batch}] Conc search - {perc}% start, dur for each conc={stage_search_dur}s") + else: + last_search_dur = self.data_volume * (1.0 - stage) // self.insert_rate + stage_search_dur = last_search_dur + self.read_dur_after_write + log.info(f"[{batch}/{total_batch}] Last conc search - {perc}% start, [read_until_write|read_after_write|total] =[{last_search_dur}s|{self.read_dur_after_write}s|{stage_search_dur}s]") + + max_qps = self.run_by_dur(stage_search_dur) + res.append((perc, max_qps, recall)) + break + return res diff --git a/vectordb_bench/backend/runner/util.py b/vectordb_bench/backend/runner/util.py new file mode 100644 index 000000000..0dfd9d0c4 --- /dev/null +++ b/vectordb_bench/backend/runner/util.py @@ -0,0 +1,32 @@ +import logging +import concurrent +from typing import Iterable + +from pandas import DataFrame +import numpy as np + +log = logging.getLogger(__name__) + +def get_data(data_df: DataFrame, normalize: bool) -> tuple[list[list[float]], list[str]]: + all_metadata = data_df['id'].tolist() + emb_np = np.stack(data_df['emb']) + if normalize: + log.debug("normalize the 100k train data") + all_embeddings = (emb_np / np.linalg.norm(emb_np, axis=1)[:, np.newaxis]).tolist() + else: + all_embeddings = emb_np.tolist() + return all_embeddings, all_metadata + +def is_futures_completed(futures: Iterable[concurrent.futures.Future], interval) -> (Exception, bool): + try: + list(concurrent.futures.as_completed(futures, timeout=interval)) + except TimeoutError as e: + return e, False + return None, True + + +def get_future_exceptions(futures: Iterable[concurrent.futures.Future]) -> BaseException | None: + for f in futures: + if f.exception() is not None: + return f.exception() + return diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py index a6d94f186..c275ebe92 100644 --- a/vectordb_bench/backend/task_runner.py +++ b/vectordb_bench/backend/task_runner.py @@ -150,7 +150,7 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric: ) self._init_search_runner() - + m.qps, m.conc_num_list, m.conc_qps_list, m.conc_latency_p99_list = self._conc_search() m.recall, m.serial_latency_p99 = self._serial_search() ''' @@ -186,7 +186,7 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric: if TaskStage.SEARCH_CONCURRENT in self.config.stages: search_results = self._conc_search() m.qps, m.conc_num_list, m.conc_qps_list, m.conc_latency_p99_list = search_results - + except Exception as e: log.warning(f"Failed to run performance case, reason = {e}") traceback.print_exc()