Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: [2.5] Add try-catch and return CStatus for NewCollection (#39279) #39303

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions internal/core/src/segcore/collection_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

#include "common/type_c.h"
#ifdef __linux__
#include <malloc.h>
#endif
Expand All @@ -17,29 +18,41 @@
#include "segcore/collection_c.h"
#include "segcore/Collection.h"

CCollection
NewCollection(const void* schema_proto_blob, const int64_t length) {
auto collection = std::make_unique<milvus::segcore::Collection>(
schema_proto_blob, length);
return (void*)collection.release();
CStatus
NewCollection(const void* schema_proto_blob,

Check warning on line 22 in internal/core/src/segcore/collection_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/collection_c.cpp#L22

Added line #L22 was not covered by tests
const int64_t length,
CCollection* newCollection) {
try {
auto collection = std::make_unique<milvus::segcore::Collection>(
schema_proto_blob, length);
*newCollection = collection.release();
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}

Check warning on line 32 in internal/core/src/segcore/collection_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/collection_c.cpp#L25-L32

Added lines #L25 - L32 were not covered by tests
}

void
CStatus
SetIndexMeta(CCollection collection,
const void* proto_blob,
const int64_t length) {
auto col = (milvus::segcore::Collection*)collection;
col->parseIndexMeta(proto_blob, length);
try {
auto col = static_cast<milvus::segcore::Collection*>(collection);
col->parseIndexMeta(proto_blob, length);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}

Check warning on line 45 in internal/core/src/segcore/collection_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/collection_c.cpp#L43-L45

Added lines #L43 - L45 were not covered by tests
}

void
DeleteCollection(CCollection collection) {
auto col = (milvus::segcore::Collection*)collection;
auto col = static_cast<milvus::segcore::Collection*>(collection);
delete col;
}

const char*
GetCollectionName(CCollection collection) {
auto col = (milvus::segcore::Collection*)collection;
auto col = static_cast<milvus::segcore::Collection*>(collection);
return strdup(col->get_collection_name().data());
}
9 changes: 6 additions & 3 deletions internal/core/src/segcore/collection_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,20 @@
#pragma once

#include <stdint.h>
#include "common/type_c.h"

#ifdef __cplusplus
extern "C" {
#endif

typedef void* CCollection;

CCollection
NewCollection(const void* schema_proto_blob, const int64_t length);
CStatus
NewCollection(const void* schema_proto_blob,
const int64_t length,
CCollection* collection);

void
CStatus
SetIndexMeta(CCollection collection,
const void* proto_blob,
const int64_t length);
Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1518,9 +1518,10 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
s.Require().NoError(err)

schema := mock_segcore.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true)
collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
collection, err := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})
s.NoError(err)

l0, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 1, &querypb.SegmentLoadInfo{
CollectionID: 1,
Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/local_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ func (suite *LocalWorkerTestSuite) BeforeTest(suiteName, testName string) {

suite.schema = mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
suite.indexMeta = mock_segcore.GenTestIndexMeta(suite.collectionID, suite.schema)
collection := segments.NewCollection(suite.collectionID, suite.schema, suite.indexMeta, &querypb.LoadMetaInfo{
collection, err := segments.NewCollection(suite.collectionID, suite.schema, suite.indexMeta, &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})
suite.NoError(err)
loadMata := &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
CollectionID: suite.collectionID,
Expand Down
6 changes: 4 additions & 2 deletions internal/querynodev2/pipeline/insert_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ func (suite *InsertNodeSuite) TestBasic() {
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
in := suite.buildInsertNodeMsg(schema)

collection := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
collection, err := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})
suite.NoError(err)
collection.AddPartition(suite.partitionID)

// init mock
Expand Down Expand Up @@ -98,9 +99,10 @@ func (suite *InsertNodeSuite) TestDataTypeNotSupported() {
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
in := suite.buildInsertNodeMsg(schema)

collection := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
collection, err := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})
suite.NoError(err)
collection.AddPartition(suite.partitionID)

// init mock
Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (suite *PipelineTestSuite) TestBasic() {
// init mock
// mock collection manager
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
collection := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
collection, err := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
DbProperties: []*commonpb.KeyValuePair{
{
Expand All @@ -111,6 +111,7 @@ func (suite *PipelineTestSuite) TestBasic() {
},
},
})
suite.Require().NoError(err)
suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection)

// mock mq factory
Expand Down
18 changes: 11 additions & 7 deletions internal/querynodev2/segments/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type CollectionManager interface {
List() []int64
ListWithName() map[int64]string
Get(collectionID int64) *Collection
PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo)
PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) error
Ref(collectionID int64, count uint32) bool
// unref the collection,
// returns true if the collection ref count goes 0, or the collection not exists,
Expand Down Expand Up @@ -84,22 +84,26 @@ func (m *collectionManager) Get(collectionID int64) *Collection {
return m.collections[collectionID]
}

func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) {
func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) error {
m.mut.Lock()
defer m.mut.Unlock()

if collection, ok := m.collections[collectionID]; ok {
// the schema may be changed even the collection is loaded
collection.schema.Store(schema)
collection.Ref(1)
return
return nil
}

log.Info("put new collection", zap.Int64("collectionID", collectionID), zap.Any("schema", schema))
collection := NewCollection(collectionID, schema, meta, loadMeta)
collection, err := NewCollection(collectionID, schema, meta, loadMeta)
if err != nil {
return err
}
collection.Ref(1)
m.collections[collectionID] = collection
m.updateMetric()
return nil
}

func (m *collectionManager) updateMetric() {
Expand Down Expand Up @@ -245,7 +249,7 @@ func (c *Collection) Unref(count uint32) uint32 {
}

// newCollection returns a new Collection
func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta, loadMetaInfo *querypb.LoadMetaInfo) *Collection {
func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta, loadMetaInfo *querypb.LoadMetaInfo) (*Collection, error) {
/*
CCollection
NewCollection(const char* schema_proto_blob);
Expand Down Expand Up @@ -281,7 +285,7 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM
ccollection, err := segcore.CreateCCollection(req)
if err != nil {
log.Warn("create collection failed", zap.Error(err))
return nil
return nil, err
}
coll := &Collection{
ccollection: ccollection,
Expand All @@ -300,7 +304,7 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM
}
coll.schema.Store(schema)

return coll
return coll, nil
}

// Only for test
Expand Down
8 changes: 5 additions & 3 deletions internal/querynodev2/segments/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ func (s *ManagerSuite) SetupTest() {

for i, id := range s.segmentIDs {
schema := mock_segcore.GenTestCollectionSchema("manager-suite", schemapb.DataType_Int64, true)
collection, err := NewCollection(s.collectionIDs[i], schema, mock_segcore.GenTestIndexMeta(s.collectionIDs[i], schema), &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})
s.Require().NoError(err)
segment, err := NewSegment(
context.Background(),
NewCollection(s.collectionIDs[i], schema, mock_segcore.GenTestIndexMeta(s.collectionIDs[i], schema), &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
}),
collection,
s.types[i],
0,
&querypb.SegmentLoadInfo{
Expand Down
23 changes: 18 additions & 5 deletions internal/querynodev2/segments/mock_collection_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/querynodev2/segments/segment_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,8 @@ func (suite *SegmentLoaderDetailSuite) SetupTest() {
PartitionIDs: []int64{suite.partitionID},
}

collection := NewCollection(suite.collectionID, schema, indexMeta, loadMeta)
collection, err := NewCollection(suite.collectionID, schema, indexMeta, loadMeta)
suite.Require().NoError(err)
suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection).Maybe()
}

Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,10 @@ func (suite *QueryNodeSuite) TestStop() {
suite.node.manager = segments.NewManager()

schema := mock_segcore.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true)
collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
collection, err := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})
suite.Require().NoError(err)
segment, err := segments.NewSegment(
context.Background(),
collection,
Expand Down
12 changes: 10 additions & 2 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,12 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
return merr.Success(), nil
}

node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
err := node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
node.composeIndexMeta(ctx, req.GetIndexInfoList(), req.Schema), req.GetLoadMeta())
if err != nil {
log.Warn("failed to ref collection", zap.Error(err))
return merr.Status(err), nil
}
defer func() {
if !merr.Ok(status) {
node.manager.Collection.Unref(req.GetCollectionID(), 1)
Expand Down Expand Up @@ -474,8 +478,12 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
return merr.Success(), nil
}

node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
err := node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
node.composeIndexMeta(ctx, req.GetIndexInfoList(), req.GetSchema()), req.GetLoadMeta())
if err != nil {
log.Warn("failed to ref collection", zap.Error(err))
return merr.Status(err), nil
}
defer node.manager.Collection.Unref(req.GetCollectionID(), 1)

if req.GetLoadScope() == querypb.LoadScope_Delta {
Expand Down
Loading
Loading