diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index 3b8cb7a5027f4..3c94119a78b7d 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -303,6 +303,7 @@ message LoadSegmentsRequest { bool need_transfer = 11; LoadScope load_scope = 12; repeated index.IndexInfo index_info_list = 13; + repeated common.KeyValuePair collection_properties = 14; } message ReleaseSegmentsRequest { diff --git a/internal/querycoordv2/task/utils.go b/internal/querycoordv2/task/utils.go index c0ecd97f068d7..d77324daa8ae6 100644 --- a/internal/querycoordv2/task/utils.go +++ b/internal/querycoordv2/task/utils.go @@ -129,17 +129,18 @@ func packLoadSegmentRequest( commonpbutil.WithMsgType(commonpb.MsgType_LoadSegments), commonpbutil.WithMsgID(task.ID()), ), - Infos: []*querypb.SegmentLoadInfo{loadInfo}, - Schema: schema, // assign it for compatibility of rolling upgrade from 2.2.x to 2.3 - LoadMeta: loadMeta, // assign it for compatibility of rolling upgrade from 2.2.x to 2.3 - CollectionID: task.CollectionID(), - ReplicaID: task.ReplicaID(), - DeltaPositions: []*msgpb.MsgPosition{loadInfo.GetDeltaPosition()}, // assign it for compatibility of rolling upgrade from 2.2.x to 2.3 - DstNodeID: action.Node(), - Version: time.Now().UnixNano(), - NeedTransfer: true, - IndexInfoList: indexInfo, - LoadScope: loadScope, + Infos: []*querypb.SegmentLoadInfo{loadInfo}, + Schema: schema, // assign it for compatibility of rolling upgrade from 2.2.x to 2.3 + LoadMeta: loadMeta, // assign it for compatibility of rolling upgrade from 2.2.x to 2.3 + CollectionID: task.CollectionID(), + ReplicaID: task.ReplicaID(), + DeltaPositions: []*msgpb.MsgPosition{loadInfo.GetDeltaPosition()}, // assign it for compatibility of rolling upgrade from 2.2.x to 2.3 + DstNodeID: action.Node(), + Version: time.Now().UnixNano(), + NeedTransfer: true, + IndexInfoList: indexInfo, + LoadScope: loadScope, + CollectionProperties: collectionProperties, } } diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 921c679f011e5..0fc8dd8106d4a 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -747,24 +747,41 @@ func getVectorFieldFromSchema(schema *schemapb.CollectionSchema) (*VectorField, // Todo support call external hook func (sd *shardDelegator) OptimizeSearchBasedOnClustering(req *querypb.SearchRequest, sealeds []SnapshotItem) (*querypb.SearchRequest, []SnapshotItem) { log := log.With(zap.Int64("searchRequestID", req.GetReq().GetReqID())) - if !paramtable.Get().QueryNodeCfg.EnableSearchBasedOnClustering.GetAsBool() { + if !paramtable.Get().QueryNodeCfg.ClusteringOptimizeSearchEnable.GetAsBool() { log.Debug("skip OptimizeSearchBasedOnClustering by system config") return req, sealeds } - if req.GetReq().GetClusteringOptions().GetFilterRatio() >= 1 { - log.Debug("skip OptimizeSearchBasedOnClustering by user config") + + if sd.vectorField.dataType != schemapb.DataType_FloatVector { + log.Debug("Currently we only support FloatVector") + return req, sealeds + } + + // filter ratio set priority: user client config > collection config > system config + var filterRatio float64 + collectionPropertiesMap := funcutil.KeyValuePair2Map(sd.collection.GetCollectionProperties()) + ratio, exist := collectionPropertiesMap[clustering.CollectionClusteringOptimizeSearchFilterRatio] + if !exist { + ratio = clustering.DefaultCollectionClusteringOptimizeSearchFilterRatio + } + filterRatio, err := strconv.ParseFloat(ratio, 64) + if err != nil { + log.Error("Error format for filterRatio", zap.String("ratio", ratio), zap.Error(err)) return req, sealeds } + if req.GetReq().GetClusteringOptions().GetFilterRatio() != 0 { + filterRatio = float64(req.GetReq().GetClusteringOptions().GetFilterRatio()) + } + metricType := req.GetReq().GetMetricType() topK := req.GetReq().GetTopk() - filterRatio := req.GetReq().GetClusteringOptions().GetFilterRatio() - log.Debug("SearchRequest parameter", + log.Debug("Search parameter", zap.String("metricType", metricType), zap.Int64("topK", topK), - zap.Float32("filterRatio", filterRatio)) + zap.Float64("filterRatio", filterRatio)) var phg commonpb.PlaceholderGroup - err := proto.Unmarshal(req.GetReq().GetPlaceholderGroup(), &phg) + err = proto.Unmarshal(req.GetReq().GetPlaceholderGroup(), &phg) if err != nil { log.Warn("fail to parse SearchRequest PlaceholderGroup", zap.Error(err)) return req, sealeds @@ -832,7 +849,7 @@ func (sd *shardDelegator) OptimizeSearchBasedOnClustering(req *querypb.SearchReq } toFilterSegNum := len(vectorSegmentDistance) - targetSegNum := int(float32(toFilterSegNum) * filterRatio) + targetSegNum := int(float64(toFilterSegNum) * filterRatio) var optimizedRowNums int64 for i, segmentDistance := range vectorSegmentDistance { if i < targetSegNum || optimizedRowNums < topK { diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index cfb64dc9e167b..68e11e2af3a83 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -127,7 +127,7 @@ func (s *DelegatorDataSuite) SetupTest() { }, }, &querypb.LoadMetaInfo{ LoadType: querypb.LoadType_LoadCollection, - }) + }, nil) s.mq = &msgstream.MockMsgStream{} diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index 78ffae531b10c..d6f42e513fa0b 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -153,7 +153,7 @@ func (s *DelegatorSuite) SetupTest() { }, }, &querypb.LoadMetaInfo{ PartitionIDs: s.partitionIDs, - }) + }, nil) s.mq = &msgstream.MockMsgStream{} @@ -1269,7 +1269,7 @@ func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_config() { assert.NotEmpty(s.T(), snapshots) assert.Equal(s.T(), 2, len(snapshots[0].Segments)) - paramtable.Get().Save(paramtable.Get().QueryNodeCfg.EnableSearchBasedOnClustering.Key, "false") + paramtable.Get().Save(paramtable.Get().QueryNodeCfg.ClusteringOptimizeSearchEnable.Key, "false") sr2, snapshots2 := s.delegator.OptimizeSearchBasedOnClustering(req, sealeds) assert.NotEmpty(s.T(), sr2) assert.NotEmpty(s.T(), snapshots2) @@ -1279,7 +1279,7 @@ func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_config() { func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_nq1() { s.delegator.Start() paramtable.SetNodeID(1) - paramtable.Get().Save(paramtable.Get().QueryNodeCfg.EnableSearchBasedOnClustering.Key, "true") + paramtable.Get().Save(paramtable.Get().QueryNodeCfg.ClusteringOptimizeSearchEnable.Key, "true") vector1 := []float32{0.40448594, 0.16214314, 0.17850745, 0.6640584, 0.77309024, 0.48807725, 0.66572666, 0.15990956} vectors := [][]float32{vector1} @@ -1360,7 +1360,7 @@ func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_nq1() { func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_nq2() { s.delegator.Start() paramtable.SetNodeID(1) - paramtable.Get().Save(paramtable.Get().QueryNodeCfg.EnableSearchBasedOnClustering.Key, "true") + paramtable.Get().Save(paramtable.Get().QueryNodeCfg.ClusteringOptimizeSearchEnable.Key, "true") vector1 := []float32{0.8877872002188053, 0.6131822285635065, 0.8476814632326242, 0.6645877829359371, 0.9962627712600025, 0.8976183052440327, 0.41941169325798844, 0.7554387854258499} vector2 := []float32{0.8644394874390322, 0.023327886647378615, 0.08330118483461302, 0.7068040179963112, 0.6983994910799851, 0.5562075958994153, 0.3288536247938002, 0.07077341010237759} vectors := [][]float32{vector1, vector2} @@ -1440,7 +1440,7 @@ func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_nq2() { func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_nq1_LargeTopK() { s.delegator.Start() paramtable.SetNodeID(1) - paramtable.Get().Save(paramtable.Get().QueryNodeCfg.EnableSearchBasedOnClustering.Key, "true") + paramtable.Get().Save(paramtable.Get().QueryNodeCfg.ClusteringOptimizeSearchEnable.Key, "true") vector1 := []float32{0.40448594, 0.16214314, 0.17850745, 0.6640584, 0.77309024, 0.48807725, 0.66572666, 0.15990956} vectors := [][]float32{vector1} @@ -1531,7 +1531,7 @@ func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_nq1_LargeTopK() { func (s *DelegatorSuite) TestOptimizeSearchBasedOnClustering_WrongClusteringInfo() { s.delegator.Start() paramtable.SetNodeID(1) - paramtable.Get().Save(paramtable.Get().QueryNodeCfg.EnableSearchBasedOnClustering.Key, "true") + paramtable.Get().Save(paramtable.Get().QueryNodeCfg.ClusteringOptimizeSearchEnable.Key, "true") vector1 := []float32{0.40448594, 0.16214314, 0.17850745, 0.6640584, 0.77309024, 0.48807725, 0.66572666, 0.15990956} vectors := [][]float32{vector1} diff --git a/internal/querynodev2/local_worker_test.go b/internal/querynodev2/local_worker_test.go index b34becbf8a106..557e2fd28187d 100644 --- a/internal/querynodev2/local_worker_test.go +++ b/internal/querynodev2/local_worker_test.go @@ -94,12 +94,12 @@ func (suite *LocalWorkerTestSuite) BeforeTest(suiteName, testName string) { suite.schema = segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64) suite.indexMeta = segments.GenTestIndexMeta(suite.collectionID, suite.schema) - collection := segments.NewCollection(suite.collectionID, suite.schema, suite.indexMeta, querypb.LoadType_LoadCollection) + collection := segments.NewCollection(suite.collectionID, suite.schema, suite.indexMeta, querypb.LoadType_LoadCollection, nil) loadMata := &querypb.LoadMetaInfo{ LoadType: querypb.LoadType_LoadCollection, CollectionID: suite.collectionID, } - suite.node.manager.Collection.PutOrRef(suite.collectionID, collection.Schema(), suite.indexMeta, loadMata) + suite.node.manager.Collection.PutOrRef(suite.collectionID, collection.Schema(), suite.indexMeta, loadMata, nil) suite.worker = NewLocalWorker(suite.node) } diff --git a/internal/querynodev2/pipeline/insert_node_test.go b/internal/querynodev2/pipeline/insert_node_test.go index 6d6979fa9b71c..b37f8354542fb 100644 --- a/internal/querynodev2/pipeline/insert_node_test.go +++ b/internal/querynodev2/pipeline/insert_node_test.go @@ -61,7 +61,7 @@ func (suite *InsertNodeSuite) TestBasic() { schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64) in := suite.buildInsertNodeMsg(schema) - collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection) + collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection, nil) collection.AddPartition(suite.partitionID) // init mock @@ -95,7 +95,7 @@ func (suite *InsertNodeSuite) TestDataTypeNotSupported() { schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64) in := suite.buildInsertNodeMsg(schema) - collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection) + collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection, nil) collection.AddPartition(suite.partitionID) // init mock diff --git a/internal/querynodev2/pipeline/pipeline_test.go b/internal/querynodev2/pipeline/pipeline_test.go index 548c1882427b0..ee0a233db4974 100644 --- a/internal/querynodev2/pipeline/pipeline_test.go +++ b/internal/querynodev2/pipeline/pipeline_test.go @@ -109,7 +109,7 @@ func (suite *PipelineTestSuite) TestBasic() { // init mock // mock collection manager schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64) - collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection) + collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection, nil) suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection) // mock mq factory diff --git a/internal/querynodev2/segments/collection.go b/internal/querynodev2/segments/collection.go index 89f12b463b083..045975c30af25 100644 --- a/internal/querynodev2/segments/collection.go +++ b/internal/querynodev2/segments/collection.go @@ -45,7 +45,7 @@ import ( type CollectionManager interface { Get(collectionID int64) *Collection - PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) + PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo, collectionProperties []*commonpb.KeyValuePair) Ref(collectionID int64, count uint32) bool // unref the collection, // returns true if the collection ref count goes 0, or the collection not exists, @@ -71,18 +71,20 @@ func (m *collectionManager) Get(collectionID int64) *Collection { return m.collections[collectionID] } -func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) { +func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo, collectionProperties []*commonpb.KeyValuePair) { m.mut.Lock() defer m.mut.Unlock() if collection, ok := m.collections[collectionID]; ok { // the schema may be changed even the collection is loaded collection.schema.Store(schema) + // update collectionProperties + collection.collectionProperties = collectionProperties collection.Ref(1) return } - collection := NewCollection(collectionID, schema, meta, loadMeta.GetLoadType()) + collection := NewCollection(collectionID, schema, meta, loadMeta.GetLoadType(), collectionProperties) collection.AddPartition(loadMeta.GetPartitionIDs()...) collection.Ref(1) m.collections[collectionID] = collection @@ -129,6 +131,8 @@ type Collection struct { isGpuIndex bool refCount *atomic.Uint32 + + collectionProperties []*commonpb.KeyValuePair } // ID returns collection id @@ -192,8 +196,12 @@ func (c *Collection) Unref(count uint32) uint32 { return refCount } +func (c *Collection) GetCollectionProperties() []*commonpb.KeyValuePair { + return c.collectionProperties +} + // newCollection returns a new Collection -func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta, loadType querypb.LoadType) *Collection { +func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta, loadType querypb.LoadType, collectionProperties []*commonpb.KeyValuePair) *Collection { /* CCollection NewCollection(const char* schema_proto_blob); @@ -226,12 +234,13 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM } coll := &Collection{ - collectionPtr: collection, - id: collectionID, - partitions: typeutil.NewConcurrentSet[int64](), - loadType: loadType, - refCount: atomic.NewUint32(0), - isGpuIndex: isGpuIndex, + collectionPtr: collection, + id: collectionID, + partitions: typeutil.NewConcurrentSet[int64](), + loadType: loadType, + refCount: atomic.NewUint32(0), + isGpuIndex: isGpuIndex, + collectionProperties: collectionProperties, } coll.schema.Store(schema) diff --git a/internal/querynodev2/segments/manager_test.go b/internal/querynodev2/segments/manager_test.go index 86c552e7c4a69..aa3712f6f7d83 100644 --- a/internal/querynodev2/segments/manager_test.go +++ b/internal/querynodev2/segments/manager_test.go @@ -46,7 +46,7 @@ func (s *ManagerSuite) SetupTest() { schema := GenTestCollectionSchema("manager-suite", schemapb.DataType_Int64) segment, err := NewSegment( context.Background(), - NewCollection(s.collectionIDs[i], schema, GenTestIndexMeta(s.collectionIDs[i], schema), querypb.LoadType_LoadCollection), + NewCollection(s.collectionIDs[i], schema, GenTestIndexMeta(s.collectionIDs[i], schema), querypb.LoadType_LoadCollection, nil), id, s.partitionIDs[i], s.collectionIDs[i], diff --git a/internal/querynodev2/segments/mock_collection_manager.go b/internal/querynodev2/segments/mock_collection_manager.go index 5e11244a492c3..381fccecc69c1 100644 --- a/internal/querynodev2/segments/mock_collection_manager.go +++ b/internal/querynodev2/segments/mock_collection_manager.go @@ -3,6 +3,7 @@ package segments import ( + commonpb "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" schemapb "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" querypb "github.com/milvus-io/milvus/internal/proto/querypb" mock "github.com/stretchr/testify/mock" @@ -67,9 +68,9 @@ func (_c *MockCollectionManager_Get_Call) RunAndReturn(run func(int64) *Collecti return _c } -// PutOrRef provides a mock function with given fields: collectionID, schema, meta, loadMeta -func (_m *MockCollectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) { - _m.Called(collectionID, schema, meta, loadMeta) +// PutOrRef provides a mock function with given fields: collectionID, schema, meta, loadMeta, collectionProperties +func (_m *MockCollectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo, collectionProperties []*commonpb.KeyValuePair) { + _m.Called(collectionID, schema, meta, loadMeta, collectionProperties) } // MockCollectionManager_PutOrRef_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PutOrRef' @@ -82,13 +83,14 @@ type MockCollectionManager_PutOrRef_Call struct { // - schema *schemapb.CollectionSchema // - meta *segcorepb.CollectionIndexMeta // - loadMeta *querypb.LoadMetaInfo -func (_e *MockCollectionManager_Expecter) PutOrRef(collectionID interface{}, schema interface{}, meta interface{}, loadMeta interface{}) *MockCollectionManager_PutOrRef_Call { - return &MockCollectionManager_PutOrRef_Call{Call: _e.mock.On("PutOrRef", collectionID, schema, meta, loadMeta)} +// - collectionProperties []*commonpb.KeyValuePair +func (_e *MockCollectionManager_Expecter) PutOrRef(collectionID interface{}, schema interface{}, meta interface{}, loadMeta interface{}, collectionProperties interface{}) *MockCollectionManager_PutOrRef_Call { + return &MockCollectionManager_PutOrRef_Call{Call: _e.mock.On("PutOrRef", collectionID, schema, meta, loadMeta, collectionProperties)} } -func (_c *MockCollectionManager_PutOrRef_Call) Run(run func(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo)) *MockCollectionManager_PutOrRef_Call { +func (_c *MockCollectionManager_PutOrRef_Call) Run(run func(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo, collectionProperties []*commonpb.KeyValuePair)) *MockCollectionManager_PutOrRef_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64), args[1].(*schemapb.CollectionSchema), args[2].(*segcorepb.CollectionIndexMeta), args[3].(*querypb.LoadMetaInfo)) + run(args[0].(int64), args[1].(*schemapb.CollectionSchema), args[2].(*segcorepb.CollectionIndexMeta), args[3].(*querypb.LoadMetaInfo), args[4].([]*commonpb.KeyValuePair)) }) return _c } @@ -98,7 +100,7 @@ func (_c *MockCollectionManager_PutOrRef_Call) Return() *MockCollectionManager_P return _c } -func (_c *MockCollectionManager_PutOrRef_Call) RunAndReturn(run func(int64, *schemapb.CollectionSchema, *segcorepb.CollectionIndexMeta, *querypb.LoadMetaInfo)) *MockCollectionManager_PutOrRef_Call { +func (_c *MockCollectionManager_PutOrRef_Call) RunAndReturn(run func(int64, *schemapb.CollectionSchema, *segcorepb.CollectionIndexMeta, *querypb.LoadMetaInfo, []*commonpb.KeyValuePair)) *MockCollectionManager_PutOrRef_Call { _c.Call.Return(run) return _c } diff --git a/internal/querynodev2/segments/plan_test.go b/internal/querynodev2/segments/plan_test.go index 6fd7772dc13c4..29b48a191d503 100644 --- a/internal/querynodev2/segments/plan_test.go +++ b/internal/querynodev2/segments/plan_test.go @@ -43,7 +43,7 @@ func (suite *PlanSuite) SetupTest() { suite.partitionID = 10 suite.segmentID = 1 schema := GenTestCollectionSchema("plan-suite", schemapb.DataType_Int64) - suite.collection = NewCollection(suite.collectionID, schema, GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection) + suite.collection = NewCollection(suite.collectionID, schema, GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection, nil) suite.collection.AddPartition(suite.partitionID) } diff --git a/internal/querynodev2/segments/reduce_test.go b/internal/querynodev2/segments/reduce_test.go index 2381cdd54e803..c1230258ce46c 100644 --- a/internal/querynodev2/segments/reduce_test.go +++ b/internal/querynodev2/segments/reduce_test.go @@ -71,6 +71,7 @@ func (suite *ReduceSuite) SetupTest() { schema, GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection, + nil, ) suite.segment, err = NewSegment(ctx, suite.collection, diff --git a/internal/querynodev2/segments/retrieve_test.go b/internal/querynodev2/segments/retrieve_test.go index 2d459378cddb9..ddf3a96a17aec 100644 --- a/internal/querynodev2/segments/retrieve_test.go +++ b/internal/querynodev2/segments/retrieve_test.go @@ -80,6 +80,7 @@ func (suite *RetrieveSuite) SetupTest() { CollectionID: suite.collectionID, PartitionIDs: []int64{suite.partitionID}, }, + nil, ) suite.collection = suite.manager.Collection.Get(suite.collectionID) diff --git a/internal/querynodev2/segments/search_test.go b/internal/querynodev2/segments/search_test.go index db657a275bbca..53c8a468221e1 100644 --- a/internal/querynodev2/segments/search_test.go +++ b/internal/querynodev2/segments/search_test.go @@ -71,6 +71,7 @@ func (suite *SearchSuite) SetupTest() { CollectionID: suite.collectionID, PartitionIDs: []int64{suite.partitionID}, }, + nil, ) suite.collection = suite.manager.Collection.Get(suite.collectionID) diff --git a/internal/querynodev2/segments/segment_loader_test.go b/internal/querynodev2/segments/segment_loader_test.go index 1d0549da0146e..375c70111e888 100644 --- a/internal/querynodev2/segments/segment_loader_test.go +++ b/internal/querynodev2/segments/segment_loader_test.go @@ -83,7 +83,7 @@ func (suite *SegmentLoaderSuite) SetupTest() { CollectionID: suite.collectionID, PartitionIDs: []int64{suite.partitionID}, } - suite.manager.Collection.PutOrRef(suite.collectionID, suite.schema, indexMeta, loadMeta) + suite.manager.Collection.PutOrRef(suite.collectionID, suite.schema, indexMeta, loadMeta, nil) } func (suite *SegmentLoaderSuite) TearDownTest() { @@ -684,7 +684,7 @@ func (suite *SegmentLoaderDetailSuite) SetupTest() { PartitionIDs: []int64{suite.partitionID}, } - collection := NewCollection(suite.collectionID, schema, indexMeta, loadMeta.GetLoadType()) + collection := NewCollection(suite.collectionID, schema, indexMeta, loadMeta.GetLoadType(), nil) suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection).Maybe() } diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index 372b1d2de7979..b0320a0c2985a 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -58,6 +58,7 @@ func (suite *SegmentSuite) SetupTest() { CollectionID: suite.collectionID, PartitionIDs: []int64{suite.partitionID}, }, + nil, ) suite.collection = suite.manager.Collection.Get(suite.collectionID) diff --git a/internal/querynodev2/server_test.go b/internal/querynodev2/server_test.go index ed32fb4c32c86..ae2bbe47e25a1 100644 --- a/internal/querynodev2/server_test.go +++ b/internal/querynodev2/server_test.go @@ -219,7 +219,7 @@ func (suite *QueryNodeSuite) TestStop() { suite.node.manager = segments.NewManager() schema := segments.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64) - collection := segments.NewCollection(1, schema, nil, querypb.LoadType_LoadCollection) + collection := segments.NewCollection(1, schema, nil, querypb.LoadType_LoadCollection, nil) segment, err := segments.NewSegment( context.Background(), collection, diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index a1e7091da867c..10a0300641ee2 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -245,7 +245,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm } node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(), - node.composeIndexMeta(req.GetIndexInfoList(), req.Schema), req.GetLoadMeta()) + node.composeIndexMeta(req.GetIndexInfoList(), req.Schema), req.GetLoadMeta(), nil) delegator, err := delegator.NewShardDelegator( ctx, @@ -457,7 +457,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen } node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(), - node.composeIndexMeta(req.GetIndexInfoList(), req.GetSchema()), req.GetLoadMeta()) + node.composeIndexMeta(req.GetIndexInfoList(), req.GetSchema()), req.GetLoadMeta(), req.GetCollectionProperties()) defer node.manager.Collection.Unref(req.GetCollectionID(), 1) if req.GetLoadScope() == querypb.LoadScope_Delta { diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index e74f723ae3819..e79994ce2a43c 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -585,7 +585,7 @@ func (suite *ServiceSuite) TestLoadSegments_VarChar() { PartitionIDs: suite.partitionIDs, } suite.node.manager.Collection = segments.NewCollectionManager() - suite.node.manager.Collection.PutOrRef(suite.collectionID, schema, nil, loadMeta) + suite.node.manager.Collection.PutOrRef(suite.collectionID, schema, nil, loadMeta, nil) infos := suite.genSegmentLoadInfos(schema, nil) for _, info := range infos { @@ -1225,7 +1225,7 @@ func (suite *ServiceSuite) TestSearch_Failed() { PartitionIDs: suite.partitionIDs, } indexMeta := suite.node.composeIndexMeta(segments.GenTestIndexInfoList(suite.collectionID, schema), schema) - suite.node.manager.Collection.PutOrRef(suite.collectionID, schema, indexMeta, LoadMeta) + suite.node.manager.Collection.PutOrRef(suite.collectionID, schema, indexMeta, LoadMeta, nil) // Delegator not found resp, err = suite.node.Search(ctx, req) diff --git a/internal/util/clustering/clustering.go b/internal/util/clustering/clustering.go index 8add1761baf57..32817e42735ff 100644 --- a/internal/util/clustering/clustering.go +++ b/internal/util/clustering/clustering.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/distance" "github.com/milvus-io/milvus/pkg/util/funcutil" ) @@ -36,7 +37,9 @@ const ( ClusteringId = "clustering.id" ClusteringOperationid = "clustering.operationID" - SearchClusteringFilterRatio = "clustering.filter_ratio" + SearchClusteringFilterRatio = "clustering.filter_ratio" + CollectionClusteringOptimizeSearchFilterRatio = common.CollectionClusteringOptimizeSearchFilterRatio + DefaultCollectionClusteringOptimizeSearchFilterRatio = "0.5" ) func SearchClusteringOptions(kv []*commonpb.KeyValuePair) (*internalpb.SearchClusteringOptions, error) { diff --git a/pkg/common/common.go b/pkg/common/common.go index 3cc05025a4a01..59d63f554264f 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -123,6 +123,8 @@ const ( CollectionSearchRateMaxKey = "collection.searchRate.max.vps" CollectionSearchRateMinKey = "collection.searchRate.min.vps" CollectionDiskQuotaKey = "collection.diskProtection.diskQuota.mb" + + CollectionClusteringOptimizeSearchFilterRatio = "collection.clusteringOptimizeSearch.filterRatio" ) // common properties diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 2a7da083203b0..7bc74495c7908 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1790,7 +1790,7 @@ type queryNodeConfig struct { ExprEvalBatchSize ParamItem `refreshable:"false"` - EnableSearchBasedOnClustering ParamItem `refreshable:"true"` + ClusteringOptimizeSearchEnable ParamItem `refreshable:"true"` } func (p *queryNodeConfig) init(base *BaseTable) { @@ -2203,13 +2203,13 @@ Max read concurrency must greater than or equal to 1, and less than or equal to p.ExprEvalBatchSize.Init(base.mgr) - p.EnableSearchBasedOnClustering = ParamItem{ - Key: "queryNode.enableSearchBasedOnClustering", + p.ClusteringOptimizeSearchEnable = ParamItem{ + Key: "queryNode.clusteringOptimizeSearch.enabled", Version: "2.4.0", DefaultValue: "true", Doc: "whether enable use clustering info to optimize search", } - p.EnableSearchBasedOnClustering.Init(base.mgr) + p.ClusteringOptimizeSearchEnable.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 0a334e74602e2..a396a6d2d2808 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -349,7 +349,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, false, Params.EnableWorkerSQCostMetrics.GetAsBool()) - assert.Equal(t, true, Params.EnableSearchBasedOnClustering.GetAsBool()) + assert.Equal(t, true, Params.ClusteringOptimizeSearchEnable.GetAsBool()) }) t.Run("test dataCoordConfig", func(t *testing.T) {