Skip to content

Commit

Permalink
enhance: [2.5] Add try-catch and return CStatus for NewCollection (#3…
Browse files Browse the repository at this point in the history
…9279) (#39303)

Cherry pick from master
pr: #39279 
Related to #28795

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Jan 16, 2025
1 parent 1d9788e commit 21df11b
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 39 deletions.
33 changes: 23 additions & 10 deletions internal/core/src/segcore/collection_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

#include "common/type_c.h"
#ifdef __linux__
#include <malloc.h>
#endif
Expand All @@ -17,29 +18,41 @@
#include "segcore/collection_c.h"
#include "segcore/Collection.h"

CCollection
NewCollection(const void* schema_proto_blob, const int64_t length) {
auto collection = std::make_unique<milvus::segcore::Collection>(
schema_proto_blob, length);
return (void*)collection.release();
CStatus
NewCollection(const void* schema_proto_blob,
const int64_t length,
CCollection* newCollection) {
try {
auto collection = std::make_unique<milvus::segcore::Collection>(
schema_proto_blob, length);
*newCollection = collection.release();
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}

void
CStatus
SetIndexMeta(CCollection collection,
const void* proto_blob,
const int64_t length) {
auto col = (milvus::segcore::Collection*)collection;
col->parseIndexMeta(proto_blob, length);
try {
auto col = static_cast<milvus::segcore::Collection*>(collection);
col->parseIndexMeta(proto_blob, length);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}

void
DeleteCollection(CCollection collection) {
auto col = (milvus::segcore::Collection*)collection;
auto col = static_cast<milvus::segcore::Collection*>(collection);
delete col;
}

const char*
GetCollectionName(CCollection collection) {
auto col = (milvus::segcore::Collection*)collection;
auto col = static_cast<milvus::segcore::Collection*>(collection);
return strdup(col->get_collection_name().data());
}
9 changes: 6 additions & 3 deletions internal/core/src/segcore/collection_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,20 @@
#pragma once

#include <stdint.h>
#include "common/type_c.h"

#ifdef __cplusplus
extern "C" {
#endif

typedef void* CCollection;

CCollection
NewCollection(const void* schema_proto_blob, const int64_t length);
CStatus
NewCollection(const void* schema_proto_blob,
const int64_t length,
CCollection* collection);

void
CStatus
SetIndexMeta(CCollection collection,
const void* proto_blob,
const int64_t length);
Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1518,9 +1518,10 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
s.Require().NoError(err)

schema := mock_segcore.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true)
collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
collection, err := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})
s.NoError(err)

l0, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 1, &querypb.SegmentLoadInfo{
CollectionID: 1,
Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/local_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ func (suite *LocalWorkerTestSuite) BeforeTest(suiteName, testName string) {

suite.schema = mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
suite.indexMeta = mock_segcore.GenTestIndexMeta(suite.collectionID, suite.schema)
collection := segments.NewCollection(suite.collectionID, suite.schema, suite.indexMeta, &querypb.LoadMetaInfo{
collection, err := segments.NewCollection(suite.collectionID, suite.schema, suite.indexMeta, &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})
suite.NoError(err)
loadMata := &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
CollectionID: suite.collectionID,
Expand Down
6 changes: 4 additions & 2 deletions internal/querynodev2/pipeline/insert_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ func (suite *InsertNodeSuite) TestBasic() {
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
in := suite.buildInsertNodeMsg(schema)

collection := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
collection, err := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})
suite.NoError(err)
collection.AddPartition(suite.partitionID)

// init mock
Expand Down Expand Up @@ -98,9 +99,10 @@ func (suite *InsertNodeSuite) TestDataTypeNotSupported() {
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
in := suite.buildInsertNodeMsg(schema)

collection := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
collection, err := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})
suite.NoError(err)
collection.AddPartition(suite.partitionID)

// init mock
Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (suite *PipelineTestSuite) TestBasic() {
// init mock
// mock collection manager
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
collection := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
collection, err := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
DbProperties: []*commonpb.KeyValuePair{
{
Expand All @@ -111,6 +111,7 @@ func (suite *PipelineTestSuite) TestBasic() {
},
},
})
suite.Require().NoError(err)
suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection)

// mock mq factory
Expand Down
18 changes: 11 additions & 7 deletions internal/querynodev2/segments/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type CollectionManager interface {
List() []int64
ListWithName() map[int64]string
Get(collectionID int64) *Collection
PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo)
PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) error
Ref(collectionID int64, count uint32) bool
// unref the collection,
// returns true if the collection ref count goes 0, or the collection not exists,
Expand Down Expand Up @@ -84,22 +84,26 @@ func (m *collectionManager) Get(collectionID int64) *Collection {
return m.collections[collectionID]
}

func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) {
func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) error {
m.mut.Lock()
defer m.mut.Unlock()

if collection, ok := m.collections[collectionID]; ok {
// the schema may be changed even the collection is loaded
collection.schema.Store(schema)
collection.Ref(1)
return
return nil
}

log.Info("put new collection", zap.Int64("collectionID", collectionID), zap.Any("schema", schema))
collection := NewCollection(collectionID, schema, meta, loadMeta)
collection, err := NewCollection(collectionID, schema, meta, loadMeta)
if err != nil {
return err
}
collection.Ref(1)
m.collections[collectionID] = collection
m.updateMetric()
return nil
}

func (m *collectionManager) updateMetric() {
Expand Down Expand Up @@ -245,7 +249,7 @@ func (c *Collection) Unref(count uint32) uint32 {
}

// newCollection returns a new Collection
func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta, loadMetaInfo *querypb.LoadMetaInfo) *Collection {
func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta, loadMetaInfo *querypb.LoadMetaInfo) (*Collection, error) {
/*
CCollection
NewCollection(const char* schema_proto_blob);
Expand Down Expand Up @@ -281,7 +285,7 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM
ccollection, err := segcore.CreateCCollection(req)
if err != nil {
log.Warn("create collection failed", zap.Error(err))
return nil
return nil, err
}
coll := &Collection{
ccollection: ccollection,
Expand All @@ -300,7 +304,7 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM
}
coll.schema.Store(schema)

return coll
return coll, nil
}

// Only for test
Expand Down
8 changes: 5 additions & 3 deletions internal/querynodev2/segments/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ func (s *ManagerSuite) SetupTest() {

for i, id := range s.segmentIDs {
schema := mock_segcore.GenTestCollectionSchema("manager-suite", schemapb.DataType_Int64, true)
collection, err := NewCollection(s.collectionIDs[i], schema, mock_segcore.GenTestIndexMeta(s.collectionIDs[i], schema), &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})
s.Require().NoError(err)
segment, err := NewSegment(
context.Background(),
NewCollection(s.collectionIDs[i], schema, mock_segcore.GenTestIndexMeta(s.collectionIDs[i], schema), &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
}),
collection,
s.types[i],
0,
&querypb.SegmentLoadInfo{
Expand Down
23 changes: 18 additions & 5 deletions internal/querynodev2/segments/mock_collection_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/querynodev2/segments/segment_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,8 @@ func (suite *SegmentLoaderDetailSuite) SetupTest() {
PartitionIDs: []int64{suite.partitionID},
}

collection := NewCollection(suite.collectionID, schema, indexMeta, loadMeta)
collection, err := NewCollection(suite.collectionID, schema, indexMeta, loadMeta)
suite.Require().NoError(err)
suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection).Maybe()
}

Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,10 @@ func (suite *QueryNodeSuite) TestStop() {
suite.node.manager = segments.NewManager()

schema := mock_segcore.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true)
collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
collection, err := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})
suite.Require().NoError(err)
segment, err := segments.NewSegment(
context.Background(),
collection,
Expand Down
12 changes: 10 additions & 2 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,12 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
return merr.Success(), nil
}

node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
err := node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
node.composeIndexMeta(ctx, req.GetIndexInfoList(), req.Schema), req.GetLoadMeta())
if err != nil {
log.Warn("failed to ref collection", zap.Error(err))
return merr.Status(err), nil
}
defer func() {
if !merr.Ok(status) {
node.manager.Collection.Unref(req.GetCollectionID(), 1)
Expand Down Expand Up @@ -474,8 +478,12 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
return merr.Success(), nil
}

node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
err := node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
node.composeIndexMeta(ctx, req.GetIndexInfoList(), req.GetSchema()), req.GetLoadMeta())
if err != nil {
log.Warn("failed to ref collection", zap.Error(err))
return merr.Status(err), nil
}
defer node.manager.Collection.Unref(req.GetCollectionID(), 1)

if req.GetLoadScope() == querypb.LoadScope_Delta {
Expand Down
Loading

0 comments on commit 21df11b

Please sign in to comment.