Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optimze to get best performance #1654

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pymilvus/client/interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
# limitations under the License.
"""Base class for interceptors that operate on all RPC types."""

import collections
from typing import Any, Callable, List
from typing import Any, Callable, List, NamedTuple

import grpc

Expand Down Expand Up @@ -74,7 +73,7 @@ def intercept_stream_stream(


class _ClientCallDetails(
collections.namedtuple("_ClientCallDetails", ("method", "timeout", "metadata", "credentials")),
NamedTuple("_ClientCallDetails", ("method", "timeout", "metadata", "credentials")),
grpc.ClientCallDetails,
):
pass
Expand Down
46 changes: 46 additions & 0 deletions pymilvus/orm/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import copy
import json
import time
from typing import Dict, List, Optional, Union

import pandas as pd
Expand All @@ -20,6 +21,7 @@
from pymilvus.client.types import (
CompactionPlans,
CompactionState,
LoadState,
Replica,
cmp_consistency_level,
get_consistency_level,
Expand All @@ -29,6 +31,7 @@
DataTypeNotMatchException,
ExceptionsMessage,
IndexNotExistException,
MilvusException,
PartitionAlreadyExistException,
PartitionNotExistException,
SchemaNotReadyException,
Expand Down Expand Up @@ -1360,3 +1363,46 @@ def get_replicas(self, timeout: Optional[float] = None, **kwargs) -> Replica:
def describe(self, timeout: Optional[float] = None):
conn = self._get_connection()
return conn.describe_collection(self.name, timeout=timeout)

def optimize(self, timeout: Optional[float] = None, **kwargs):
"""Optimize the server to gain the best performance.

Be careful, by default this method may hang very very long.
The collection should be INDEXED before optimize.
"""

timeout = timeout or 12 * 60 * 60 # set default timeout to 12hrs

start_time = time.time()
conn = self._get_connection()

# check if indexed
if not self.has_index():
raise MilvusException(message="Please index before calling optimize")

self.flush(timeout=timeout)
index = self.index()

def has_pending_rows() -> bool:
info = conn.get_index_build_progress(self.name, index.index_name, timeout=timeout)
return info.get("pending_index_rows", -1) > 0

while True:
if not has_pending_rows():
self.compact()
self.wait_for_compaction_completed()
if not has_pending_rows():
break

if time.time() - start_time > timeout:
raise MilvusException(message=f"Wait for optimize timeout in {timeout}s")

time.sleep(5)

# If loaded, load refresh
state = conn.get_load_state(self.name)
if state == LoadState.Loaded:
self.load(_refresh=True)
elif state == LoadState.Loading:
conn.wait_for_loading_collection(self.name, timeout=timeout)
self.load(_refresh=True)
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ ignore = [
"ARG002",
"E501", # black takes care of it
"ARG005", # [ruff] ARG005 Unused lambda argument: `disable` [E]
"TRY400",
"PYI024"
"TRY400", # TRY400 Use `logging.exception` instead of `logging.error` TODO
]

# Allow autofix for all enabled rules (when `--fix`) is provided.
Expand Down