diff --git a/pymilvus/client/interceptor.py b/pymilvus/client/interceptor.py index 29a9fb24d..b879f5648 100644 --- a/pymilvus/client/interceptor.py +++ b/pymilvus/client/interceptor.py @@ -13,8 +13,7 @@ # limitations under the License. """Base class for interceptors that operate on all RPC types.""" -import collections -from typing import Any, Callable, List +from typing import Any, Callable, List, NamedTuple import grpc @@ -74,7 +73,7 @@ def intercept_stream_stream( class _ClientCallDetails( - collections.namedtuple("_ClientCallDetails", ("method", "timeout", "metadata", "credentials")), + NamedTuple("_ClientCallDetails", ("method", "timeout", "metadata", "credentials")), grpc.ClientCallDetails, ): pass diff --git a/pymilvus/orm/collection.py b/pymilvus/orm/collection.py index 6d1d357f4..8357677c0 100644 --- a/pymilvus/orm/collection.py +++ b/pymilvus/orm/collection.py @@ -12,6 +12,7 @@ import copy import json +import time from typing import Dict, List, Optional, Union import pandas as pd @@ -20,6 +21,7 @@ from pymilvus.client.types import ( CompactionPlans, CompactionState, + LoadState, Replica, cmp_consistency_level, get_consistency_level, @@ -29,6 +31,7 @@ DataTypeNotMatchException, ExceptionsMessage, IndexNotExistException, + MilvusException, PartitionAlreadyExistException, PartitionNotExistException, SchemaNotReadyException, @@ -1360,3 +1363,46 @@ def get_replicas(self, timeout: Optional[float] = None, **kwargs) -> Replica: def describe(self, timeout: Optional[float] = None): conn = self._get_connection() return conn.describe_collection(self.name, timeout=timeout) + + def optimize(self, timeout: Optional[float] = None, **kwargs): + """Optimize the server to gain the best performance. + + Be careful, by default this method may hang very very long. + The collection should be INDEXED before optimize. + """ + + timeout = timeout or 12 * 60 * 60 # set default timeout to 12hrs + + start_time = time.time() + conn = self._get_connection() + + # check if indexed + if not self.has_index(): + raise MilvusException(message="Please index before calling optimize") + + self.flush(timeout=timeout) + index = self.index() + + def has_pending_rows() -> bool: + info = conn.get_index_build_progress(self.name, index.index_name, timeout=timeout) + return info.get("pending_index_rows", -1) > 0 + + while True: + if not has_pending_rows(): + self.compact() + self.wait_for_compaction_completed() + if not has_pending_rows(): + break + + if time.time() - start_time > timeout: + raise MilvusException(message=f"Wait for optimize timeout in {timeout}s") + + time.sleep(5) + + # If loaded, load refresh + state = conn.get_load_state(self.name) + if state == LoadState.Loaded: + self.load(_refresh=True) + elif state == LoadState.Loading: + conn.wait_for_loading_collection(self.name, timeout=timeout) + self.load(_refresh=True) diff --git a/pyproject.toml b/pyproject.toml index 77cf239fa..7ed9644fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -87,8 +87,7 @@ ignore = [ "ARG002", "E501", # black takes care of it "ARG005", # [ruff] ARG005 Unused lambda argument: `disable` [E] - "TRY400", - "PYI024" + "TRY400", # TRY400 Use `logging.exception` instead of `logging.error` TODO ] # Allow autofix for all enabled rules (when `--fix`) is provided.