diff --git a/cmd/tools/config/generate.go b/cmd/tools/config/generate.go
index fe8b1e39a2b33..acacb43b603f5 100644
--- a/cmd/tools/config/generate.go
+++ b/cmd/tools/config/generate.go
@@ -68,6 +68,9 @@ func collectRecursive(params *paramtable.ComponentParam, data *[]DocContent, val
item := subVal.Interface().(paramtable.ParamItem) //nolint:govet
refreshable := tag.Get("refreshable")
defaultValue := params.GetWithDefault(item.Key, item.DefaultValue)
+ if strings.HasPrefix(item.DefaultValue, "\"") && strings.HasSuffix(item.DefaultValue, "\"") {
+ defaultValue = fmt.Sprintf("\"%s\"", defaultValue)
+ }
log.Debug("got key", zap.String("key", item.Key), zap.Any("value", defaultValue), zap.String("variable", val.Type().Field(j).Name))
*data = append(*data, DocContent{item.Key, defaultValue, item.Version, refreshable, item.Export, item.Doc})
} else if t == "paramtable.ParamGroup" {
diff --git a/configs/milvus.yaml b/configs/milvus.yaml
index d265b4d269513..168932efe9e1a 100644
--- a/configs/milvus.yaml
+++ b/configs/milvus.yaml
@@ -796,7 +796,7 @@ common:
# The superusers will ignore some system check processes,
# like the old password verification when updating the credential
superUsers:
- defaultRootPassword: Milvus # default password for root user
+ defaultRootPassword: "Milvus" # default password for root user. The maximum length is 72 characters, and double quotes are required.
rbac:
overrideBuiltInPrivilgeGroups:
enabled: false # Whether to override build-in privilege groups
diff --git a/internal/http/static/index.html b/internal/http/static/index.html
index 0a12f181e62ee..246f8970db532 100644
--- a/internal/http/static/index.html
+++ b/internal/http/static/index.html
@@ -157,7 +157,7 @@
Result
if (xhr.status === 200) {
document.getElementById('resultText').textContent = JSON.stringify(JSON.parse(xhr.responseText), null, 2);
} else {
- document.getElementById('resultText').textContent = 'Error: ' + xhr.status;
+ document.getElementById('resultText').textContent = `Error: ${xhr.status}, detail: ${xhr.responseText}`;
}
};
xhr.send();
diff --git a/internal/metastore/model/database.go b/internal/metastore/model/database.go
index 52fe49fb82243..95166a65b5de6 100644
--- a/internal/metastore/model/database.go
+++ b/internal/metastore/model/database.go
@@ -31,8 +31,8 @@ func NewDatabase(id int64, name string, state pb.DatabaseState, properties []*co
}
}
-func NewDefaultDatabase() *Database {
- return NewDatabase(util.DefaultDBID, util.DefaultDBName, pb.DatabaseState_DatabaseCreated, nil)
+func NewDefaultDatabase(prop []*commonpb.KeyValuePair) *Database {
+ return NewDatabase(util.DefaultDBID, util.DefaultDBName, pb.DatabaseState_DatabaseCreated, prop)
}
func (c *Database) Available() bool {
diff --git a/internal/metastore/model/database_test.go b/internal/metastore/model/database_test.go
index 8effb2217acd2..622674d285171 100644
--- a/internal/metastore/model/database_test.go
+++ b/internal/metastore/model/database_test.go
@@ -59,5 +59,5 @@ func TestDatabaseCloneAndEqual(t *testing.T) {
func TestDatabaseAvailable(t *testing.T) {
assert.True(t, dbModel.Available())
- assert.True(t, NewDefaultDatabase().Available())
+ assert.True(t, NewDefaultDatabase(nil).Available())
}
diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go
index 7a5dea1fd047d..77723504c93c2 100644
--- a/internal/proxy/meta_cache.go
+++ b/internal/proxy/meta_cache.go
@@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/conc"
+ "github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@@ -58,7 +59,7 @@ type Cache interface {
// GetCollectionName get collection's name and database by id
GetCollectionName(ctx context.Context, database string, collectionID int64) (string, error)
// GetCollectionInfo get collection's information by name or collection id, such as schema, and etc.
- GetCollectionInfo(ctx context.Context, database, collectionName string, collectionID int64) (*collectionBasicInfo, error)
+ GetCollectionInfo(ctx context.Context, database, collectionName string, collectionID int64) (*collectionInfo, error)
// GetPartitionID get partition's identifier of specific collection.
GetPartitionID(ctx context.Context, database, collectionName string, partitionName string) (typeutil.UniqueID, error)
// GetPartitions get all partitions' id of specific collection.
@@ -93,13 +94,6 @@ type Cache interface {
// AllocID is only using on requests that need to skip timestamp allocation, don't overuse it.
AllocID(ctx context.Context) (int64, error)
}
-type collectionBasicInfo struct {
- collID typeutil.UniqueID
- createdTimestamp uint64
- createdUtcTimestamp uint64
- consistencyLevel commonpb.ConsistencyLevel
- partitionKeyIsolation bool
-}
type collectionInfo struct {
collID typeutil.UniqueID
@@ -267,20 +261,7 @@ type partitionInfo struct {
partitionID typeutil.UniqueID
createdTimestamp uint64
createdUtcTimestamp uint64
-}
-
-// getBasicInfo get a basic info by deep copy.
-func (info *collectionInfo) getBasicInfo() *collectionBasicInfo {
- // Do a deep copy for all fields.
- basicInfo := &collectionBasicInfo{
- collID: info.collID,
- createdTimestamp: info.createdTimestamp,
- createdUtcTimestamp: info.createdUtcTimestamp,
- consistencyLevel: info.consistencyLevel,
- partitionKeyIsolation: info.partitionKeyIsolation,
- }
-
- return basicInfo
+ isDefault bool
}
func (info *collectionInfo) isCollectionCached() bool {
@@ -368,6 +349,7 @@ func InitMetaCache(ctx context.Context, rootCoord types.RootCoordClient, queryCo
if err != nil {
return err
}
+ expr.Register("cache", globalMetaCache)
// The privilege info is a little more. And to get this info, the query operation of involving multiple table queries is required.
resp, err := rootCoord.ListPolicy(ctx, &internalpb.ListPolicyRequest{})
@@ -458,12 +440,14 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
return nil, merr.WrapErrParameterInvalidMsg("partition names and timestamps number is not aligned, response: %s", partitions.String())
}
+ defaultPartitionName := Params.CommonCfg.DefaultPartitionName.GetValue()
infos := lo.Map(partitions.GetPartitionIDs(), func(partitionID int64, idx int) *partitionInfo {
return &partitionInfo{
name: partitions.PartitionNames[idx],
partitionID: partitions.PartitionIDs[idx],
createdTimestamp: partitions.CreatedTimestamps[idx],
createdUtcTimestamp: partitions.CreatedUtcTimestamps[idx],
+ isDefault: partitions.PartitionNames[idx] == defaultPartitionName,
}
})
@@ -568,7 +552,7 @@ func (m *MetaCache) GetCollectionName(ctx context.Context, database string, coll
return collInfo.schema.Name, nil
}
-func (m *MetaCache) GetCollectionInfo(ctx context.Context, database string, collectionName string, collectionID int64) (*collectionBasicInfo, error) {
+func (m *MetaCache) GetCollectionInfo(ctx context.Context, database string, collectionName string, collectionID int64) (*collectionInfo, error) {
collInfo, ok := m.getCollection(database, collectionName, 0)
method := "GetCollectionInfo"
@@ -583,11 +567,11 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, database string, coll
return nil, err
}
metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- return collInfo.getBasicInfo(), nil
+ return collInfo, nil
}
metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheHitLabel).Inc()
- return collInfo.getBasicInfo(), nil
+ return collInfo, nil
}
// GetCollectionInfo returns the collection information related to provided collection name
@@ -661,6 +645,14 @@ func (m *MetaCache) GetPartitionInfo(ctx context.Context, database, collectionNa
return nil, err
}
+ if partitionName == "" {
+ for _, info := range partitions.partitionInfos {
+ if info.isDefault {
+ return info, nil
+ }
+ }
+ }
+
info, ok := partitions.name2Info[partitionName]
if !ok {
return nil, merr.WrapErrPartitionNotFound(partitionName)
@@ -718,30 +710,14 @@ func (m *MetaCache) describeCollection(ctx context.Context, database, collection
if err != nil {
return nil, err
}
- resp := &milvuspb.DescribeCollectionResponse{
- Status: coll.Status,
- Schema: &schemapb.CollectionSchema{
- Name: coll.Schema.Name,
- Description: coll.Schema.Description,
- AutoID: coll.Schema.AutoID,
- Fields: make([]*schemapb.FieldSchema, 0),
- EnableDynamicField: coll.Schema.EnableDynamicField,
- },
- CollectionID: coll.CollectionID,
- VirtualChannelNames: coll.VirtualChannelNames,
- PhysicalChannelNames: coll.PhysicalChannelNames,
- CreatedTimestamp: coll.CreatedTimestamp,
- CreatedUtcTimestamp: coll.CreatedUtcTimestamp,
- ConsistencyLevel: coll.ConsistencyLevel,
- DbName: coll.GetDbName(),
- Properties: coll.Properties,
- }
+ userFields := make([]*schemapb.FieldSchema, 0)
for _, field := range coll.Schema.Fields {
if field.FieldID >= common.StartOfUserFieldID {
- resp.Schema.Fields = append(resp.Schema.Fields, field)
+ userFields = append(userFields, field)
}
}
- return resp, nil
+ coll.Schema.Fields = userFields
+ return coll, nil
}
func (m *MetaCache) showPartitions(ctx context.Context, dbName string, collectionName string, collectionID UniqueID) (*milvuspb.ShowPartitionsResponse, error) {
diff --git a/internal/proxy/meta_cache_test.go b/internal/proxy/meta_cache_test.go
index 475ed2240f69f..3e8ba8ce6eb3d 100644
--- a/internal/proxy/meta_cache_test.go
+++ b/internal/proxy/meta_cache_test.go
@@ -57,6 +57,25 @@ type MockRootCoordClientInterface struct {
listPolicy func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error)
}
+func EqualSchema(t *testing.T, expect, actual *schemapb.CollectionSchema) {
+ assert.Equal(t, expect.AutoID, actual.AutoID)
+ assert.Equal(t, expect.Description, actual.Description)
+ assert.Equal(t, expect.Name, actual.Name)
+ assert.Equal(t, expect.EnableDynamicField, actual.EnableDynamicField)
+ assert.Equal(t, len(expect.Fields), len(actual.Fields))
+ for i := range expect.Fields {
+ assert.Equal(t, expect.Fields[i], actual.Fields[i])
+ }
+ // assert.Equal(t, len(expect.Functions), len(actual.Functions))
+ // for i := range expect.Functions {
+ // assert.Equal(t, expect.Functions[i], actual.Functions[i])
+ // }
+ assert.Equal(t, len(expect.Properties), len(actual.Properties))
+ for i := range expect.Properties {
+ assert.Equal(t, expect.Properties[i], actual.Properties[i])
+ }
+}
+
func (m *MockRootCoordClientInterface) IncAccessCount() {
atomic.AddInt32(&m.AccessCount, 1)
}
@@ -212,7 +231,7 @@ func TestMetaCache_GetCollection(t *testing.T) {
schema, err := globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.Equal(t, rootCoord.GetAccessCount(), 1)
assert.NoError(t, err)
- assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
+ EqualSchema(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
@@ -224,7 +243,7 @@ func TestMetaCache_GetCollection(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection2")
assert.Equal(t, rootCoord.GetAccessCount(), 2)
assert.NoError(t, err)
- assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
+ EqualSchema(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection2",
@@ -238,7 +257,7 @@ func TestMetaCache_GetCollection(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.Equal(t, rootCoord.GetAccessCount(), 2)
assert.NoError(t, err)
- assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
+ EqualSchema(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
@@ -323,7 +342,7 @@ func TestMetaCache_GetCollectionName(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.Equal(t, rootCoord.GetAccessCount(), 2)
assert.NoError(t, err)
- assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
+ EqualSchema(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
@@ -348,7 +367,7 @@ func TestMetaCache_GetCollectionFailure(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.NoError(t, err)
- assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
+ EqualSchema(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
@@ -357,7 +376,7 @@ func TestMetaCache_GetCollectionFailure(t *testing.T) {
rootCoord.Error = true
// should be cached with no error
assert.NoError(t, err)
- assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
+ EqualSchema(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
@@ -421,7 +440,7 @@ func TestMetaCache_ConcurrentTest1(t *testing.T) {
// GetCollectionSchema will never fail
schema, err := globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.NoError(t, err)
- assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
+ EqualSchema(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
diff --git a/internal/proxy/mock_cache.go b/internal/proxy/mock_cache.go
index 2601a2a66203e..e33dd84263935 100644
--- a/internal/proxy/mock_cache.go
+++ b/internal/proxy/mock_cache.go
@@ -165,19 +165,23 @@ func (_c *MockCache_GetCollectionID_Call) RunAndReturn(run func(context.Context,
}
// GetCollectionInfo provides a mock function with given fields: ctx, database, collectionName, collectionID
-func (_m *MockCache) GetCollectionInfo(ctx context.Context, database string, collectionName string, collectionID int64) (*collectionBasicInfo, error) {
+func (_m *MockCache) GetCollectionInfo(ctx context.Context, database string, collectionName string, collectionID int64) (*collectionInfo, error) {
ret := _m.Called(ctx, database, collectionName, collectionID)
- var r0 *collectionBasicInfo
+ if len(ret) == 0 {
+ panic("no return value specified for GetCollectionInfo")
+ }
+
+ var r0 *collectionInfo
var r1 error
- if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) (*collectionBasicInfo, error)); ok {
+ if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) (*collectionInfo, error)); ok {
return rf(ctx, database, collectionName, collectionID)
}
- if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) *collectionBasicInfo); ok {
+ if rf, ok := ret.Get(0).(func(context.Context, string, string, int64) *collectionInfo); ok {
r0 = rf(ctx, database, collectionName, collectionID)
} else {
if ret.Get(0) != nil {
- r0 = ret.Get(0).(*collectionBasicInfo)
+ r0 = ret.Get(0).(*collectionInfo)
}
}
@@ -211,12 +215,12 @@ func (_c *MockCache_GetCollectionInfo_Call) Run(run func(ctx context.Context, da
return _c
}
-func (_c *MockCache_GetCollectionInfo_Call) Return(_a0 *collectionBasicInfo, _a1 error) *MockCache_GetCollectionInfo_Call {
+func (_c *MockCache_GetCollectionInfo_Call) Return(_a0 *collectionInfo, _a1 error) *MockCache_GetCollectionInfo_Call {
_c.Call.Return(_a0, _a1)
return _c
}
-func (_c *MockCache_GetCollectionInfo_Call) RunAndReturn(run func(context.Context, string, string, int64) (*collectionBasicInfo, error)) *MockCache_GetCollectionInfo_Call {
+func (_c *MockCache_GetCollectionInfo_Call) RunAndReturn(run func(context.Context, string, string, int64) (*collectionInfo, error)) *MockCache_GetCollectionInfo_Call {
_c.Call.Return(run)
return _c
}
diff --git a/internal/proxy/msg_pack.go b/internal/proxy/msg_pack.go
index 426cc0a9d1f7b..96fe24eb5ead3 100644
--- a/internal/proxy/msg_pack.go
+++ b/internal/proxy/msg_pack.go
@@ -58,6 +58,7 @@ func genInsertMsgsByPartition(ctx context.Context,
),
CollectionID: insertMsg.CollectionID,
PartitionID: partitionID,
+ DbName: insertMsg.DbName,
CollectionName: insertMsg.CollectionName,
PartitionName: partitionName,
SegmentID: segmentID,
diff --git a/internal/proxy/rate_limit_interceptor.go b/internal/proxy/rate_limit_interceptor.go
index ca4b99c23e30b..996961fd8000d 100644
--- a/internal/proxy/rate_limit_interceptor.go
+++ b/internal/proxy/rate_limit_interceptor.go
@@ -84,7 +84,13 @@ func getCollectionAndPartitionID(ctx context.Context, r reqPartName) (int64, map
return 0, nil, err
}
if r.GetPartitionName() == "" {
- return db.dbID, map[int64][]int64{collectionID: {}}, nil
+ collectionSchema, err := globalMetaCache.GetCollectionSchema(ctx, r.GetDbName(), r.GetCollectionName())
+ if err != nil {
+ return 0, nil, err
+ }
+ if collectionSchema.IsPartitionKeyCollection() {
+ return db.dbID, map[int64][]int64{collectionID: {}}, nil
+ }
}
part, err := globalMetaCache.GetPartitionInfo(ctx, r.GetDbName(), r.GetCollectionName(), r.GetPartitionName())
if err != nil {
diff --git a/internal/proxy/rate_limit_interceptor_test.go b/internal/proxy/rate_limit_interceptor_test.go
index 56e9345c85abd..22a5c98c326c9 100644
--- a/internal/proxy/rate_limit_interceptor_test.go
+++ b/internal/proxy/rate_limit_interceptor_test.go
@@ -299,6 +299,7 @@ func TestRateLimitInterceptor(t *testing.T) {
dbID: 100,
createdTimestamp: 1,
}, nil)
+ mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{}, nil)
globalMetaCache = mockCache
limiter := limiterMock{rate: 100}
@@ -437,6 +438,41 @@ func TestGetInfo(t *testing.T) {
}
})
+ t.Run("fail to get collection schema", func(t *testing.T) {
+ mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{
+ dbID: 100,
+ createdTimestamp: 1,
+ }, nil).Once()
+ mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil).Once()
+ mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mock error")).Once()
+
+ _, _, err := getCollectionAndPartitionID(ctx, &milvuspb.InsertRequest{
+ DbName: "foo",
+ CollectionName: "coo",
+ })
+ assert.Error(t, err)
+ })
+
+ t.Run("partition key mode", func(t *testing.T) {
+ mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{
+ dbID: 100,
+ createdTimestamp: 1,
+ }, nil).Once()
+ mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil).Once()
+ mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
+ hasPartitionKeyField: true,
+ }, nil).Once()
+
+ db, col2par, err := getCollectionAndPartitionID(ctx, &milvuspb.InsertRequest{
+ DbName: "foo",
+ CollectionName: "coo",
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, int64(100), db)
+ assert.NotNil(t, col2par[1])
+ assert.Equal(t, 0, len(col2par[1]))
+ })
+
t.Run("fail to get partition", func(t *testing.T) {
mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{
dbID: 100,
@@ -467,11 +503,12 @@ func TestGetInfo(t *testing.T) {
dbID: 100,
createdTimestamp: 1,
}, nil).Times(3)
+ mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{}, nil).Times(1)
mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(int64(10), nil).Times(3)
mockCache.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&partitionInfo{
name: "p1",
partitionID: 100,
- }, nil).Twice()
+ }, nil).Times(3)
{
db, col2par, err := getCollectionAndPartitionID(ctx, &milvuspb.InsertRequest{
DbName: "foo",
@@ -491,7 +528,7 @@ func TestGetInfo(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, int64(100), db)
assert.NotNil(t, col2par[10])
- assert.Equal(t, 0, len(col2par[10]))
+ assert.Equal(t, int64(100), col2par[10][0])
}
{
db, col2par, err := getCollectionAndPartitionIDs(ctx, &milvuspb.SearchRequest{
diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go
index 451d9375c3442..df5403aea5b65 100644
--- a/internal/proxy/task_delete.go
+++ b/internal/proxy/task_delete.go
@@ -55,6 +55,7 @@ type deleteTask struct {
primaryKeys *schemapb.IDs
collectionID UniqueID
partitionID UniqueID
+ dbID UniqueID
partitionKeyMode bool
// set by scheduler
@@ -145,14 +146,11 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
result, numRows, err := repackDeleteMsgByHash(
ctx,
- dt.primaryKeys,
- dt.vChannels,
- dt.idAllocator,
- dt.ts,
- dt.collectionID,
- dt.req.GetCollectionName(),
- dt.partitionID,
- dt.req.GetPartitionName(),
+ dt.primaryKeys, dt.vChannels,
+ dt.idAllocator, dt.ts,
+ dt.collectionID, dt.req.GetCollectionName(),
+ dt.partitionID, dt.req.GetPartitionName(),
+ dt.req.GetDbName(),
)
if err != nil {
return err
@@ -200,6 +198,7 @@ func repackDeleteMsgByHash(
collectionName string,
partitionID int64,
partitionName string,
+ dbName string,
) (map[uint32][]*msgstream.DeleteMsg, int64, error) {
maxSize := Params.PulsarCfg.MaxMessageSize.GetAsInt()
hashValues := typeutil.HashPK2Channels(primaryKeys, vChannels)
@@ -229,6 +228,7 @@ func repackDeleteMsgByHash(
PartitionID: partitionID,
CollectionName: collectionName,
PartitionName: partitionName,
+ DbName: dbName,
PrimaryKeys: &schemapb.IDs{},
ShardName: vchannel,
},
@@ -408,6 +408,7 @@ func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs)
partitionKeyMode: dr.partitionKeyMode,
vChannels: dr.vChannels,
primaryKeys: primaryKeys,
+ dbID: dr.dbID,
}
if err := dr.queue.Enqueue(task); err != nil {
diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go
index fd86fc9d3c343..620b469d2fa96 100644
--- a/internal/proxy/task_insert.go
+++ b/internal/proxy/task_insert.go
@@ -202,7 +202,12 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
// insert to _default partition
partitionTag := it.insertMsg.GetPartitionName()
if len(partitionTag) <= 0 {
- partitionTag = Params.CommonCfg.DefaultPartitionName.GetValue()
+ pinfo, err := globalMetaCache.GetPartitionInfo(ctx, it.insertMsg.GetDbName(), collectionName, "")
+ if err != nil {
+ log.Warn("get partition info failed", zap.String("collectionName", collectionName), zap.Error(err))
+ return err
+ }
+ partitionTag = pinfo.name
it.insertMsg.PartitionName = partitionTag
}
diff --git a/internal/proxy/task_query_test.go b/internal/proxy/task_query_test.go
index 34c7e7bd4e39c..fe54b8f22f569 100644
--- a/internal/proxy/task_query_test.go
+++ b/internal/proxy/task_query_test.go
@@ -993,7 +993,7 @@ func TestQueryTask_CanSkipAllocTimestamp(t *testing.T) {
}
mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(collID, nil)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
- &collectionBasicInfo{
+ &collectionInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil).Once()
@@ -1002,7 +1002,7 @@ func TestQueryTask_CanSkipAllocTimestamp(t *testing.T) {
assert.True(t, skip)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
- &collectionBasicInfo{
+ &collectionInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Bounded,
}, nil).Once()
@@ -1010,7 +1010,7 @@ func TestQueryTask_CanSkipAllocTimestamp(t *testing.T) {
assert.True(t, skip)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
- &collectionBasicInfo{
+ &collectionInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Strong,
}, nil).Once()
@@ -1020,7 +1020,7 @@ func TestQueryTask_CanSkipAllocTimestamp(t *testing.T) {
t.Run("request consistency level", func(t *testing.T) {
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
- &collectionBasicInfo{
+ &collectionInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil).Times(3)
@@ -1092,7 +1092,7 @@ func TestQueryTask_CanSkipAllocTimestamp(t *testing.T) {
mockMetaCache.ExpectedCalls = nil
mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(collID, fmt.Errorf("mock error"))
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
- &collectionBasicInfo{
+ &collectionInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil)
diff --git a/internal/proxy/task_scheduler_test.go b/internal/proxy/task_scheduler_test.go
index 9b16150b4cf5e..23f405ac47a41 100644
--- a/internal/proxy/task_scheduler_test.go
+++ b/internal/proxy/task_scheduler_test.go
@@ -621,7 +621,7 @@ func TestTaskScheduler_SkipAllocTimestamp(t *testing.T) {
mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(collID, nil)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
- &collectionBasicInfo{
+ &collectionInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil)
diff --git a/internal/proxy/task_search_test.go b/internal/proxy/task_search_test.go
index d0f41b9d5cdf0..e80ade6d087dc 100644
--- a/internal/proxy/task_search_test.go
+++ b/internal/proxy/task_search_test.go
@@ -2207,7 +2207,7 @@ func TestSearchTask_Requery(t *testing.T) {
cache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(collectionID, nil).Maybe()
cache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(schema, nil).Maybe()
cache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"_default": UniqueID(1)}, nil).Maybe()
- cache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionBasicInfo{}, nil).Maybe()
+ cache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe()
cache.EXPECT().GetShards(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(map[string][]nodeInfo{}, nil).Maybe()
cache.EXPECT().DeprecateShardCache(mock.Anything, mock.Anything).Return().Maybe()
globalMetaCache = cache
@@ -2526,7 +2526,7 @@ func TestSearchTask_CanSkipAllocTimestamp(t *testing.T) {
}
mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(collID, nil)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
- &collectionBasicInfo{
+ &collectionInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil).Once()
@@ -2535,7 +2535,7 @@ func TestSearchTask_CanSkipAllocTimestamp(t *testing.T) {
assert.True(t, skip)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
- &collectionBasicInfo{
+ &collectionInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Bounded,
}, nil).Once()
@@ -2543,7 +2543,7 @@ func TestSearchTask_CanSkipAllocTimestamp(t *testing.T) {
assert.True(t, skip)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
- &collectionBasicInfo{
+ &collectionInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Strong,
}, nil).Once()
@@ -2553,7 +2553,7 @@ func TestSearchTask_CanSkipAllocTimestamp(t *testing.T) {
t.Run("request consistency level", func(t *testing.T) {
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
- &collectionBasicInfo{
+ &collectionInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil).Times(3)
@@ -2625,7 +2625,7 @@ func TestSearchTask_CanSkipAllocTimestamp(t *testing.T) {
mockMetaCache.ExpectedCalls = nil
mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(collID, fmt.Errorf("mock error"))
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
- &collectionBasicInfo{
+ &collectionInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil)
@@ -2680,7 +2680,7 @@ func (s *MaterializedViewTestSuite) SetupTest() {
s.mockMetaCache = NewMockCache(s.T())
s.mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.colID, nil)
s.mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
- &collectionBasicInfo{
+ &collectionInfo{
collID: s.colID,
partitionKeyIsolation: true,
}, nil)
diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go
index 6240feae49eb1..ecfd474f13902 100644
--- a/internal/proxy/task_test.go
+++ b/internal/proxy/task_test.go
@@ -3576,6 +3576,204 @@ func TestPartitionKey(t *testing.T) {
})
}
+func TestDefaultPartition(t *testing.T) {
+ rc := NewRootCoordMock()
+
+ defer rc.Close()
+ qc := getQueryCoordClient()
+ qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{}, nil).Maybe()
+
+ ctx := context.Background()
+
+ mgr := newShardClientMgr()
+ err := InitMetaCache(ctx, rc, qc, mgr)
+ assert.NoError(t, err)
+
+ shardsNum := common.DefaultShardsNum
+ prefix := "TestInsertTaskWithPartitionKey"
+ collectionName := prefix + funcutil.GenRandomStr()
+
+ fieldName2Type := make(map[string]schemapb.DataType)
+ fieldName2Type["int64_field"] = schemapb.DataType_Int64
+ fieldName2Type["varChar_field"] = schemapb.DataType_VarChar
+ fieldName2Type["fvec_field"] = schemapb.DataType_FloatVector
+ schema := constructCollectionSchemaByDataType(collectionName, fieldName2Type, "int64_field", false)
+ marshaledSchema, err := proto.Marshal(schema)
+ assert.NoError(t, err)
+
+ t.Run("create collection", func(t *testing.T) {
+ createCollectionTask := &createCollectionTask{
+ Condition: NewTaskCondition(ctx),
+ CreateCollectionRequest: &milvuspb.CreateCollectionRequest{
+ Base: &commonpb.MsgBase{
+ MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
+ Timestamp: Timestamp(time.Now().UnixNano()),
+ },
+ DbName: "",
+ CollectionName: collectionName,
+ Schema: marshaledSchema,
+ ShardsNum: shardsNum,
+ },
+ ctx: ctx,
+ rootCoord: rc,
+ result: nil,
+ schema: nil,
+ }
+ err = createCollectionTask.PreExecute(ctx)
+ assert.NoError(t, err)
+ err = createCollectionTask.Execute(ctx)
+ assert.NoError(t, err)
+ })
+
+ collectionID, err := globalMetaCache.GetCollectionID(ctx, GetCurDBNameFromContextOrDefault(ctx), collectionName)
+ assert.NoError(t, err)
+
+ dmlChannelsFunc := getDmlChannelsFunc(ctx, rc)
+ factory := newSimpleMockMsgStreamFactory()
+ chMgr := newChannelsMgrImpl(dmlChannelsFunc, nil, factory)
+ defer chMgr.removeAllDMLStream()
+
+ _, err = chMgr.getOrCreateDmlStream(collectionID)
+ assert.NoError(t, err)
+ pchans, err := chMgr.getChannels(collectionID)
+ assert.NoError(t, err)
+
+ interval := time.Millisecond * 10
+ tso := newMockTsoAllocator()
+
+ ticker := newChannelsTimeTicker(ctx, interval, []string{}, newGetStatisticsFunc(pchans), tso)
+ _ = ticker.start()
+ defer ticker.close()
+
+ idAllocator, err := allocator.NewIDAllocator(ctx, rc, paramtable.GetNodeID())
+ assert.NoError(t, err)
+ _ = idAllocator.Start()
+ defer idAllocator.Close()
+
+ segAllocator, err := newSegIDAssigner(ctx, &mockDataCoord{expireTime: Timestamp(2500)}, getLastTick1)
+ assert.NoError(t, err)
+ segAllocator.Init()
+ _ = segAllocator.Start()
+ defer segAllocator.Close()
+
+ nb := 10
+ fieldID := common.StartOfUserFieldID
+ fieldDatas := make([]*schemapb.FieldData, 0)
+ for fieldName, dataType := range fieldName2Type {
+ fieldData := generateFieldData(dataType, fieldName, nb)
+ fieldData.FieldId = int64(fieldID)
+ fieldDatas = append(fieldDatas, generateFieldData(dataType, fieldName, nb))
+ fieldID++
+ }
+
+ t.Run("Insert", func(t *testing.T) {
+ it := &insertTask{
+ insertMsg: &BaseInsertTask{
+ BaseMsg: msgstream.BaseMsg{},
+ InsertRequest: &msgpb.InsertRequest{
+ Base: &commonpb.MsgBase{
+ MsgType: commonpb.MsgType_Insert,
+ MsgID: 0,
+ SourceID: paramtable.GetNodeID(),
+ },
+ CollectionName: collectionName,
+ FieldsData: fieldDatas,
+ NumRows: uint64(nb),
+ Version: msgpb.InsertDataVersion_ColumnBased,
+ },
+ },
+
+ Condition: NewTaskCondition(ctx),
+ ctx: ctx,
+ result: &milvuspb.MutationResult{
+ Status: merr.Success(),
+ IDs: nil,
+ SuccIndex: nil,
+ ErrIndex: nil,
+ Acknowledged: false,
+ InsertCnt: 0,
+ DeleteCnt: 0,
+ UpsertCnt: 0,
+ Timestamp: 0,
+ },
+ idAllocator: idAllocator,
+ segIDAssigner: segAllocator,
+ chMgr: chMgr,
+ chTicker: ticker,
+ vChannels: nil,
+ pChannels: nil,
+ schema: nil,
+ }
+
+ it.insertMsg.PartitionName = ""
+ assert.NoError(t, it.OnEnqueue())
+ assert.NoError(t, it.PreExecute(ctx))
+ assert.NoError(t, it.Execute(ctx))
+ assert.NoError(t, it.PostExecute(ctx))
+ })
+
+ t.Run("Upsert", func(t *testing.T) {
+ hash := testutils.GenerateHashKeys(nb)
+ ut := &upsertTask{
+ ctx: ctx,
+ Condition: NewTaskCondition(ctx),
+ baseMsg: msgstream.BaseMsg{
+ HashValues: hash,
+ },
+ req: &milvuspb.UpsertRequest{
+ Base: commonpbutil.NewMsgBase(
+ commonpbutil.WithMsgType(commonpb.MsgType_Upsert),
+ commonpbutil.WithSourceID(paramtable.GetNodeID()),
+ ),
+ CollectionName: collectionName,
+ FieldsData: fieldDatas,
+ NumRows: uint32(nb),
+ },
+
+ result: &milvuspb.MutationResult{
+ Status: merr.Success(),
+ IDs: &schemapb.IDs{
+ IdField: nil,
+ },
+ },
+ idAllocator: idAllocator,
+ segIDAssigner: segAllocator,
+ chMgr: chMgr,
+ chTicker: ticker,
+ }
+
+ ut.req.PartitionName = ""
+ assert.NoError(t, ut.OnEnqueue())
+ assert.NoError(t, ut.PreExecute(ctx))
+ assert.NoError(t, ut.Execute(ctx))
+ assert.NoError(t, ut.PostExecute(ctx))
+ })
+
+ t.Run("delete", func(t *testing.T) {
+ dt := &deleteTask{
+ Condition: NewTaskCondition(ctx),
+ req: &milvuspb.DeleteRequest{
+ CollectionName: collectionName,
+ Expr: "int64_field in [0, 1]",
+ },
+ ctx: ctx,
+ primaryKeys: &schemapb.IDs{
+ IdField: &schemapb.IDs_IntId{IntId: &schemapb.LongArray{Data: []int64{0, 1}}},
+ },
+ idAllocator: idAllocator,
+ chMgr: chMgr,
+ chTicker: ticker,
+ collectionID: collectionID,
+ vChannels: []string{"test-channel"},
+ }
+
+ dt.req.PartitionName = ""
+ assert.NoError(t, dt.PreExecute(ctx))
+ assert.NoError(t, dt.Execute(ctx))
+ assert.NoError(t, dt.PostExecute(ctx))
+ })
+}
+
func TestClusteringKey(t *testing.T) {
rc := NewRootCoordMock()
diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go
index 825323c1d33df..b2a5528826988 100644
--- a/internal/proxy/task_upsert.go
+++ b/internal/proxy/task_upsert.go
@@ -317,8 +317,12 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
// insert to _default partition
partitionTag := it.req.GetPartitionName()
if len(partitionTag) <= 0 {
- partitionTag = Params.CommonCfg.DefaultPartitionName.GetValue()
- it.req.PartitionName = partitionTag
+ pinfo, err := globalMetaCache.GetPartitionInfo(ctx, it.req.GetDbName(), collectionName, "")
+ if err != nil {
+ log.Warn("get partition info failed", zap.String("collectionName", collectionName), zap.Error(err))
+ return err
+ }
+ it.req.PartitionName = pinfo.name
}
}
diff --git a/internal/querynodev2/pipeline/manager_test.go b/internal/querynodev2/pipeline/manager_test.go
index e1654cd462df1..d8479dcf3f4e8 100644
--- a/internal/querynodev2/pipeline/manager_test.go
+++ b/internal/querynodev2/pipeline/manager_test.go
@@ -101,7 +101,7 @@ func (suite *PipelineManagerTestSuite) TestBasic() {
suite.NotNil(pipeline)
// Init Consumer
- err = pipeline.ConsumeMsgStream(&msgpb.MsgPosition{})
+ err = pipeline.ConsumeMsgStream(context.Background(), &msgpb.MsgPosition{})
suite.NoError(err)
// Start pipeline
diff --git a/internal/querynodev2/pipeline/pipeline_test.go b/internal/querynodev2/pipeline/pipeline_test.go
index 3dca8e674cc53..8e2edde82016b 100644
--- a/internal/querynodev2/pipeline/pipeline_test.go
+++ b/internal/querynodev2/pipeline/pipeline_test.go
@@ -147,7 +147,7 @@ func (suite *PipelineTestSuite) TestBasic() {
suite.NoError(err)
// Init Consumer
- err = pipeline.ConsumeMsgStream(&msgpb.MsgPosition{})
+ err = pipeline.ConsumeMsgStream(context.Background(), &msgpb.MsgPosition{})
suite.NoError(err)
err = pipeline.Start()
diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go
index 67eb84346c704..45e0334e9bbf9 100644
--- a/internal/querynodev2/services.go
+++ b/internal/querynodev2/services.go
@@ -326,7 +326,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
MsgID: channel.SeekPosition.MsgID,
Timestamp: channel.SeekPosition.Timestamp,
}
- err = pipeline.ConsumeMsgStream(position)
+ err = pipeline.ConsumeMsgStream(ctx, position)
if err != nil {
err = merr.WrapErrServiceUnavailable(err.Error(), "InitPipelineFailed")
log.Warn(err.Error(),
diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go
index b24b14fc80245..d5cfc3542a1f5 100644
--- a/internal/rootcoord/create_collection_task.go
+++ b/internal/rootcoord/create_collection_task.go
@@ -567,6 +567,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
func (t *createCollectionTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
- NewDatabaseLockerKey(t.Req.GetDbName(), true),
+ NewDatabaseLockerKey(t.Req.GetDbName(), false),
+ NewCollectionLockerKey(t.Req.GetCollectionName(), true),
)
}
diff --git a/internal/rootcoord/create_collection_task_test.go b/internal/rootcoord/create_collection_task_test.go
index 6d1bdb56c9c3c..0ba27388dde9f 100644
--- a/internal/rootcoord/create_collection_task_test.go
+++ b/internal/rootcoord/create_collection_task_test.go
@@ -619,7 +619,7 @@ func Test_createCollectionTask_Prepare(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything,
- ).Return(model.NewDefaultDatabase(), nil)
+ ).Return(model.NewDefaultDatabase(nil), nil)
meta.On("ListAllAvailCollections",
mock.Anything,
).Return(map[int64][]int64{
@@ -1028,7 +1028,7 @@ func Test_createCollectionTask_PartitionKey(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything,
- ).Return(model.NewDefaultDatabase(), nil)
+ ).Return(model.NewDefaultDatabase(nil), nil)
meta.On("ListAllAvailCollections",
mock.Anything,
).Return(map[int64][]int64{
diff --git a/internal/rootcoord/create_db_task_test.go b/internal/rootcoord/create_db_task_test.go
index 09fb9c5a2397c..5f83139150d6e 100644
--- a/internal/rootcoord/create_db_task_test.go
+++ b/internal/rootcoord/create_db_task_test.go
@@ -54,7 +54,7 @@ func Test_CreateDBTask_Prepare(t *testing.T) {
cfgMaxDatabaseNum := Params.RootCoordCfg.MaxDatabaseNum.GetAsInt()
dbs := make([]*model.Database, 0, cfgMaxDatabaseNum)
for i := 0; i < cfgMaxDatabaseNum; i++ {
- dbs = append(dbs, model.NewDefaultDatabase())
+ dbs = append(dbs, model.NewDefaultDatabase(nil))
}
meta.On("ListDatabases",
mock.Anything,
@@ -81,7 +81,7 @@ func Test_CreateDBTask_Prepare(t *testing.T) {
meta.On("ListDatabases",
mock.Anything,
mock.Anything).
- Return([]*model.Database{model.NewDefaultDatabase()}, nil)
+ Return([]*model.Database{model.NewDefaultDatabase(nil)}, nil)
core := newTestCore(withMeta(meta), withValidIDAllocator())
paramtable.Get().Save(Params.RootCoordCfg.MaxDatabaseNum.Key, strconv.Itoa(10))
diff --git a/internal/rootcoord/describe_db_task_test.go b/internal/rootcoord/describe_db_task_test.go
index 9d86708d92f42..2d9b8a4f5ef32 100644
--- a/internal/rootcoord/describe_db_task_test.go
+++ b/internal/rootcoord/describe_db_task_test.go
@@ -48,7 +48,7 @@ func Test_describeDatabaseTask_Execute(t *testing.T) {
t.Run("describe with empty database name", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetDatabaseByName(mock.Anything, mock.Anything, mock.Anything).
- Return(model.NewDefaultDatabase(), nil)
+ Return(model.NewDefaultDatabase(nil), nil)
core := newTestCore(withMeta(meta))
task := &describeDBTask{
diff --git a/internal/rootcoord/drop_collection_task.go b/internal/rootcoord/drop_collection_task.go
index a5a65b1cd1d85..f7015267be731 100644
--- a/internal/rootcoord/drop_collection_task.go
+++ b/internal/rootcoord/drop_collection_task.go
@@ -121,5 +121,9 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {
}
func (t *dropCollectionTask) GetLockerKey() LockerKey {
- return NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey(t.Req.GetDbName(), true))
+ return NewLockerKeyChain(
+ NewClusterLockerKey(false),
+ NewDatabaseLockerKey(t.Req.GetDbName(), false),
+ NewCollectionLockerKey(t.Req.GetCollectionName(), true),
+ )
}
diff --git a/internal/rootcoord/list_db_task_test.go b/internal/rootcoord/list_db_task_test.go
index 29d4feb3a62c5..db5ac702535fd 100644
--- a/internal/rootcoord/list_db_task_test.go
+++ b/internal/rootcoord/list_db_task_test.go
@@ -58,7 +58,7 @@ func Test_ListDBTask(t *testing.T) {
})
t.Run("ok", func(t *testing.T) {
- ret := []*model.Database{model.NewDefaultDatabase()}
+ ret := []*model.Database{model.NewDefaultDatabase(nil)}
meta := mockrootcoord.NewIMetaTable(t)
meta.On("ListDatabases",
mock.Anything,
@@ -89,7 +89,7 @@ func Test_ListDBTask(t *testing.T) {
t.Run("list db with auth", func(t *testing.T) {
Params.Save(Params.CommonCfg.AuthorizationEnabled.Key, "true")
defer Params.Reset(Params.CommonCfg.AuthorizationEnabled.Key)
- ret := []*model.Database{model.NewDefaultDatabase()}
+ ret := []*model.Database{model.NewDefaultDatabase(nil)}
meta := mockrootcoord.NewIMetaTable(t)
core := newTestCore(withMeta(meta))
diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go
index f866353ba70ae..363be64441b2d 100644
--- a/internal/rootcoord/meta_table.go
+++ b/internal/rootcoord/meta_table.go
@@ -262,7 +262,13 @@ func (mt *MetaTable) createDefaultDb() error {
return err
}
- return mt.createDatabasePrivate(mt.ctx, model.NewDefaultDatabase(), ts)
+ s := Params.RootCoordCfg.DefaultDBProperties.GetValue()
+ defaultProperties, err := funcutil.String2KeyValuePair(s)
+ if err != nil {
+ return err
+ }
+
+ return mt.createDatabasePrivate(mt.ctx, model.NewDefaultDatabase(defaultProperties), ts)
}
func (mt *MetaTable) CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error {
diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go
index 3bf93aeb31cdb..fe9e92e499bc2 100644
--- a/internal/rootcoord/meta_table_test.go
+++ b/internal/rootcoord/meta_table_test.go
@@ -460,7 +460,7 @@ func TestMetaTable_getCollectionByIDInternal(t *testing.T) {
meta := &MetaTable{
catalog: catalog,
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
},
collID2Meta: map[typeutil.UniqueID]*model.Collection{},
}
@@ -480,7 +480,7 @@ func TestMetaTable_getCollectionByIDInternal(t *testing.T) {
meta := &MetaTable{
catalog: catalog,
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
},
collID2Meta: map[typeutil.UniqueID]*model.Collection{},
}
@@ -548,7 +548,7 @@ func TestMetaTable_GetCollectionByName(t *testing.T) {
},
},
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
},
}
ctx := context.Background()
@@ -569,7 +569,7 @@ func TestMetaTable_GetCollectionByName(t *testing.T) {
},
},
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
},
}
meta.aliases.insert(util.DefaultDBName, "alias", 100)
@@ -596,7 +596,7 @@ func TestMetaTable_GetCollectionByName(t *testing.T) {
},
},
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
},
}
meta.names.insert(util.DefaultDBName, "name", 100)
@@ -618,7 +618,7 @@ func TestMetaTable_GetCollectionByName(t *testing.T) {
).Return(nil, errors.New("error mock GetCollectionByName"))
meta := &MetaTable{
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
},
names: newNameDb(),
aliases: newNameDb(),
@@ -639,7 +639,7 @@ func TestMetaTable_GetCollectionByName(t *testing.T) {
).Return(&model.Collection{State: pb.CollectionState_CollectionDropped}, nil)
meta := &MetaTable{
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
},
names: newNameDb(),
aliases: newNameDb(),
@@ -669,7 +669,7 @@ func TestMetaTable_GetCollectionByName(t *testing.T) {
meta := &MetaTable{
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
},
names: newNameDb(),
aliases: newNameDb(),
@@ -687,7 +687,7 @@ func TestMetaTable_GetCollectionByName(t *testing.T) {
ctx := context.Background()
meta := &MetaTable{
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
},
names: newNameDb(),
aliases: newNameDb(),
@@ -1206,7 +1206,7 @@ func TestMetaTable_reload(t *testing.T) {
catalog.On("ListDatabases",
mock.Anything,
mock.Anything,
- ).Return([]*model.Database{model.NewDefaultDatabase()}, nil)
+ ).Return([]*model.Database{model.NewDefaultDatabase(nil)}, nil)
catalog.On("ListCollections",
mock.Anything,
mock.Anything,
@@ -1508,7 +1508,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
t.Run("new collection name already exist-1", func(t *testing.T) {
meta := &MetaTable{
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
},
names: newNameDb(),
aliases: newNameDb(),
@@ -1535,7 +1535,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
).Return(nil, errors.New("error mock GetCollectionByID"))
meta := &MetaTable{
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
},
catalog: catalog,
names: newNameDb(),
@@ -1565,7 +1565,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
meta := &MetaTable{
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
},
catalog: catalog,
names: newNameDb(),
@@ -1592,7 +1592,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
).Return(nil, merr.WrapErrCollectionNotFound("error"))
meta := &MetaTable{
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
"db1": model.NewDatabase(2, "db1", pb.DatabaseState_DatabaseCreated, nil),
},
catalog: catalog,
@@ -1622,7 +1622,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
).Return(nil, merr.WrapErrCollectionNotFound("error"))
meta := &MetaTable{
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
"db1": model.NewDatabase(2, "db1", pb.DatabaseState_DatabaseCreated, nil),
},
catalog: catalog,
@@ -1659,7 +1659,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
).Return(nil, merr.WrapErrCollectionNotFound("error"))
meta := &MetaTable{
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
},
catalog: catalog,
names: newNameDb(),
@@ -1911,7 +1911,7 @@ func TestMetaTable_EmtpyDatabaseName(t *testing.T) {
1: {CollectionID: 1},
},
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
},
}
@@ -1925,7 +1925,7 @@ func TestMetaTable_EmtpyDatabaseName(t *testing.T) {
mt := &MetaTable{
names: newNameDb(),
dbName2Meta: map[string]*model.Database{
- util.DefaultDBName: model.NewDefaultDatabase(),
+ util.DefaultDBName: model.NewDefaultDatabase(nil),
"db2": model.NewDatabase(2, "db2", pb.DatabaseState_DatabaseCreated, nil),
},
collID2Meta: map[typeutil.UniqueID]*model.Collection{
diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go
index d976227e58947..de102201be154 100644
--- a/internal/rootcoord/root_coord.go
+++ b/internal/rootcoord/root_coord.go
@@ -531,9 +531,13 @@ func (c *Core) Init() error {
func (c *Core) initCredentials() error {
credInfo, _ := c.meta.GetCredential(util.UserRoot)
if credInfo == nil {
- log.Debug("RootCoord init user root")
- encryptedRootPassword, _ := crypto.PasswordEncrypt(Params.CommonCfg.DefaultRootPassword.GetValue())
- err := c.meta.AddCredential(&internalpb.CredentialInfo{Username: util.UserRoot, EncryptedPassword: encryptedRootPassword})
+ encryptedRootPassword, err := crypto.PasswordEncrypt(Params.CommonCfg.DefaultRootPassword.GetValue())
+ if err != nil {
+ log.Warn("RootCoord init user root failed", zap.Error(err))
+ return err
+ }
+ log.Info("RootCoord init user root")
+ err = c.meta.AddCredential(&internalpb.CredentialInfo{Username: util.UserRoot, EncryptedPassword: encryptedRootPassword})
return err
}
return nil
diff --git a/internal/rootcoord/task_test.go b/internal/rootcoord/task_test.go
index f2ee5d1bbe5c6..ed4522c97a091 100644
--- a/internal/rootcoord/task_test.go
+++ b/internal/rootcoord/task_test.go
@@ -169,7 +169,7 @@ func TestGetLockerKey(t *testing.T) {
},
}
key := tt.GetLockerKey()
- assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-true")
+ assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|bar-2-true")
})
t.Run("create database task locker key", func(t *testing.T) {
tt := &createDatabaseTask{
@@ -277,7 +277,7 @@ func TestGetLockerKey(t *testing.T) {
},
}
key := tt.GetLockerKey()
- assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-true")
+ assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|bar-2-true")
})
t.Run("drop database task locker key", func(t *testing.T) {
tt := &dropDatabaseTask{
diff --git a/internal/util/pipeline/stream_pipeline.go b/internal/util/pipeline/stream_pipeline.go
index 9485129f891e4..ad037f6b4dc4f 100644
--- a/internal/util/pipeline/stream_pipeline.go
+++ b/internal/util/pipeline/stream_pipeline.go
@@ -33,7 +33,7 @@ import (
type StreamPipeline interface {
Pipeline
- ConsumeMsgStream(position *msgpb.MsgPosition) error
+ ConsumeMsgStream(ctx context.Context, position *msgpb.MsgPosition) error
}
type streamPipeline struct {
@@ -63,7 +63,7 @@ func (p *streamPipeline) work() {
}
}
-func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error {
+func (p *streamPipeline) ConsumeMsgStream(ctx context.Context, position *msgpb.MsgPosition) error {
var err error
if position == nil {
log.Error("seek stream to nil position")
@@ -71,7 +71,7 @@ func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error {
}
start := time.Now()
- p.input, err = p.dispatcher.Register(context.TODO(), p.vChannel, position, common.SubscriptionPositionUnknown)
+ p.input, err = p.dispatcher.Register(ctx, p.vChannel, position, common.SubscriptionPositionUnknown)
if err != nil {
log.Error("dispatcher register failed", zap.String("channel", position.ChannelName))
return WrapErrRegDispather(err)
diff --git a/internal/util/pipeline/stream_pipeline_test.go b/internal/util/pipeline/stream_pipeline_test.go
index 0f94bd18b52d3..8ceaf38e52194 100644
--- a/internal/util/pipeline/stream_pipeline_test.go
+++ b/internal/util/pipeline/stream_pipeline_test.go
@@ -17,6 +17,7 @@
package pipeline
import (
+ context2 "context"
"fmt"
"testing"
@@ -63,7 +64,7 @@ func (suite *StreamPipelineSuite) TestBasic() {
})
}
- err := suite.pipeline.ConsumeMsgStream(&msgpb.MsgPosition{})
+ err := suite.pipeline.ConsumeMsgStream(context2.Background(), &msgpb.MsgPosition{})
suite.NoError(err)
suite.pipeline.Start()
diff --git a/pkg/util/funcutil/func.go b/pkg/util/funcutil/func.go
index 82f79560cb525..5ceeb8c036a9a 100644
--- a/pkg/util/funcutil/func.go
+++ b/pkg/util/funcutil/func.go
@@ -168,6 +168,15 @@ func GetVecFieldIDs(schema *schemapb.CollectionSchema) []int64 {
return vecFieldIDs
}
+func String2KeyValuePair(v string) ([]*commonpb.KeyValuePair, error) {
+ m := make(map[string]string)
+ err := json.Unmarshal([]byte(v), &m)
+ if err != nil {
+ return nil, err
+ }
+ return Map2KeyValuePair(m), nil
+}
+
func Map2KeyValuePair(datas map[string]string) []*commonpb.KeyValuePair {
results := make([]*commonpb.KeyValuePair, len(datas))
offset := 0
diff --git a/pkg/util/funcutil/func_test.go b/pkg/util/funcutil/func_test.go
index ed9c2555f54a3..0201e5b01ea27 100644
--- a/pkg/util/funcutil/func_test.go
+++ b/pkg/util/funcutil/func_test.go
@@ -838,3 +838,24 @@ func (s *NumRowsWithSchemaSuite) TestErrorCases() {
func TestNumRowsWithSchema(t *testing.T) {
suite.Run(t, new(NumRowsWithSchemaSuite))
}
+
+func TestString2KeyValuePair(t *testing.T) {
+ t.Run("normal", func(t *testing.T) {
+ kvs, err := String2KeyValuePair("{\"key\": \"value\"}")
+ assert.NoError(t, err)
+ assert.Len(t, kvs, 1)
+ assert.Equal(t, "key", kvs[0].Key)
+ assert.Equal(t, "value", kvs[0].Value)
+ })
+
+ t.Run("err", func(t *testing.T) {
+ _, err := String2KeyValuePair("{aa}")
+ assert.Error(t, err)
+ })
+
+ t.Run("empty", func(t *testing.T) {
+ kvs, err := String2KeyValuePair("{}")
+ assert.NoError(t, err)
+ assert.Len(t, kvs, 0)
+ })
+}
diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go
index a03267de7fb3a..57359bf6b3f0c 100644
--- a/pkg/util/paramtable/component_param.go
+++ b/pkg/util/paramtable/component_param.go
@@ -653,8 +653,8 @@ like the old password verification when updating the credential`,
p.DefaultRootPassword = ParamItem{
Key: "common.security.defaultRootPassword",
Version: "2.4.7",
- Doc: "default password for root user",
- DefaultValue: "Milvus",
+ Doc: "default password for root user. The maximum length is 72 characters, and double quotes are required.",
+ DefaultValue: "\"Milvus\"",
Export: true,
}
p.DefaultRootPassword.Init(base.mgr)
@@ -1146,6 +1146,7 @@ type rootCoordConfig struct {
MaxGeneralCapacity ParamItem `refreshable:"true"`
GracefulStopTimeout ParamItem `refreshable:"true"`
UseLockScheduler ParamItem `refreshable:"true"`
+ DefaultDBProperties ParamItem `refreshable:"false"`
}
func (p *rootCoordConfig) init(base *BaseTable) {
@@ -1229,6 +1230,15 @@ Segments with smaller size than this parameter will not be indexed, and will be
Export: false,
}
p.UseLockScheduler.Init(base.mgr)
+
+ p.DefaultDBProperties = ParamItem{
+ Key: "rootCoord.defaultDBProperties",
+ Version: "2.4.16",
+ DefaultValue: "{}",
+ Doc: "default db properties, should be a json string",
+ Export: false,
+ }
+ p.DefaultDBProperties.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////
@@ -1356,8 +1366,15 @@ func (p *proxyConfig) init(base *BaseTable) {
p.MaxPasswordLength = ParamItem{
Key: "proxy.maxPasswordLength",
- DefaultValue: "256",
+ DefaultValue: "72", // bcrypt max length
Version: "2.0.0",
+ Formatter: func(v string) string {
+ n := getAsInt(v)
+ if n <= 0 || n > 72 {
+ return "72"
+ }
+ return v
+ },
PanicIfEmpty: true,
}
p.MaxPasswordLength.Init(base.mgr)
diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go
index d691565287b49..4656de6fda473 100644
--- a/pkg/util/paramtable/component_param_test.go
+++ b/pkg/util/paramtable/component_param_test.go
@@ -153,6 +153,10 @@ func TestComponentParam(t *testing.T) {
params.Save("rootCoord.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
+ assert.Equal(t, "{}", Params.DefaultDBProperties.GetValue())
+ params.Save("rootCoord.defaultDBProperties", "{\"key\":\"value\"}")
+ assert.Equal(t, "{\"key\":\"value\"}", Params.DefaultDBProperties.GetValue())
+
SetCreateTime(time.Now())
SetUpdateTime(time.Now())
})
@@ -215,6 +219,12 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, int64(10), Params.CheckWorkloadRequestNum.GetAsInt64())
assert.Equal(t, float64(0.1), Params.WorkloadToleranceFactor.GetAsFloat())
+
+ assert.Equal(t, 72, Params.MaxPasswordLength.GetAsInt())
+ params.Save("proxy.maxPasswordLength", "100")
+ assert.Equal(t, 72, Params.MaxPasswordLength.GetAsInt())
+ params.Save("proxy.maxPasswordLength", "-10")
+ assert.Equal(t, 72, Params.MaxPasswordLength.GetAsInt())
})
// t.Run("test proxyConfig panic", func(t *testing.T) {
diff --git a/pkg/util/retry/retry.go b/pkg/util/retry/retry.go
index eeb9115cfea19..c623bb1dbeb4d 100644
--- a/pkg/util/retry/retry.go
+++ b/pkg/util/retry/retry.go
@@ -47,19 +47,34 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error {
}
if !IsRecoverable(err) {
- if errors.IsAny(err, context.Canceled, context.DeadlineExceeded) && lastErr != nil {
+ isContextErr := errors.IsAny(err, context.Canceled, context.DeadlineExceeded)
+ log.Warn("retry func failed, not be recoverable",
+ zap.Uint("retried", i),
+ zap.Uint("attempt", c.attempts),
+ zap.Bool("isContextErr", isContextErr),
+ )
+ if isContextErr && lastErr != nil {
return lastErr
}
return err
}
if c.isRetryErr != nil && !c.isRetryErr(err) {
+ log.Warn("retry func failed, not be retryable",
+ zap.Uint("retried", i),
+ zap.Uint("attempt", c.attempts),
+ )
return err
}
deadline, ok := ctx.Deadline()
if ok && time.Until(deadline) < c.sleep {
- // to avoid sleep until ctx done
- if errors.IsAny(err, context.Canceled, context.DeadlineExceeded) && lastErr != nil {
+ isContextErr := errors.IsAny(err, context.Canceled, context.DeadlineExceeded)
+ log.Warn("retry func failed, deadline",
+ zap.Uint("retried", i),
+ zap.Uint("attempt", c.attempts),
+ zap.Bool("isContextErr", isContextErr),
+ )
+ if isContextErr && lastErr != nil {
return lastErr
}
return err
@@ -70,6 +85,10 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error {
select {
case <-time.After(c.sleep):
case <-ctx.Done():
+ log.Warn("retry func failed, ctx done",
+ zap.Uint("retried", i),
+ zap.Uint("attempt", c.attempts),
+ )
return lastErr
}
@@ -81,6 +100,11 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error {
return nil
}
}
+ if lastErr != nil {
+ log.Warn("retry func failed, reach max retry",
+ zap.Uint("attempt", c.attempts),
+ )
+ }
return lastErr
}
diff --git a/tests/python_client/milvus_client/test_milvus_client_partition.py b/tests/python_client/milvus_client/test_milvus_client_partition.py
index 152d5aa0377eb..a31f9d35410c8 100644
--- a/tests/python_client/milvus_client/test_milvus_client_partition.py
+++ b/tests/python_client/milvus_client/test_milvus_client_partition.py
@@ -554,7 +554,7 @@ def test_milvus_client_release_partition_name_lists_not_all_exists(self):
client_w.release_partitions(client, collection_name, partition_names,
check_task=CheckTasks.err_res, check_items=error)
- @pytest.mark.tags(CaseLabel.L1)
+ @pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_release_not_exist_partition_name(self):
"""
target: test fast release partition -- invalid partition name type