Skip to content
This repository has been archived by the owner on Nov 13, 2024. It is now read-only.

Commit

Permalink
Merge pull request #102 from zilliztech/feature_support_partition
Browse files Browse the repository at this point in the history
support migration collection partions
  • Loading branch information
wenhuiZilliz authored Aug 22, 2024
2 parents eb88fa3 + c7a6188 commit 3db4700
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 4 deletions.
1 change: 1 addition & 0 deletions core/common/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,4 @@ const SUB_FILE_SIZE = 1024 * 1024 * 300

const UPSERT = "upsert"
const MILVUS_META_FD = "$meta"
const DEFAULT_PARTITION_NAME = "_default"
6 changes: 4 additions & 2 deletions core/common/param.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type CollectionParam struct {
FileMapKey string
}
type CollectionInfo struct {
Param *CollectionParam
Fields []*entity.Field
Param *CollectionParam
Fields []*entity.Field
Partitions []*entity.Partition
PartitionKey string
}
15 changes: 15 additions & 0 deletions core/dbclient/cus_field_milvus2x.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (cus *CustomFieldMilvus2x) createCollection(ctx context.Context, collection
zap.Any("fields", collectionInfo.Fields),
zap.Bool("dynamicField", collectionInfo.Param.EnableDynamicField),
zap.Bool("autoId", collectionInfo.Param.AutoId),
zap.String("partitionKey", collectionInfo.PartitionKey),
zap.Any("partitions", collectionInfo.Partitions),
zap.String("description", collectionInfo.Param.Description))
// schema
schema := &entity.Schema{
Expand All @@ -77,6 +79,19 @@ func (cus *CustomFieldMilvus2x) createCollection(ctx context.Context, collection
zap.String("collection", collectionInfo.Param.CollectionName), zap.Error(err))
return err
}
if collectionInfo.PartitionKey == "" && collectionInfo.Partitions != nil {
for _, partition := range collectionInfo.Partitions {
if partition.Name == common.DEFAULT_PARTITION_NAME {
continue
}
err := cus.Milvus2x.milvus.CreatePartition(ctx, collectionInfo.Param.CollectionName, partition.Name)
if err != nil {
log.Error("Create custom field milvus2x Collection Partition error", zap.String("collection", collectionInfo.Param.CollectionName),
zap.String("partitionName", partition.Name), zap.Error(err))
return err
}
}
}
return nil
}

Expand Down
25 changes: 23 additions & 2 deletions core/transform/milvus2x/convert/milvus2x_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,19 @@ func ToMilvusParam(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, mil
//当source是开启动态列表,并且 target也打开动态列属性,则动态列需要迁移(DynamicField=true)
collCfg.DynamicField = srcCollEntity.Schema.EnableDynamicField && !collCfg.MilvusCfg.CloseDynamicField

//partition key
partitionKey := getPartitionKey(srcCollEntity)
//partition
var partitions []*entity.Partition = nil
if partitionKey == "" {
partitions, err = milvus2xCli.VerCli.ShowPartitions(ctx, collCfg.Collection)
if err != nil {
return nil, err
}
}

log.Info("milvus2x source collection_schema", zap.Bool("DynamicFieldStatus", collCfg.DynamicField),
zap.String("Collection", collCfg.Collection))
zap.String("Collection", collCfg.Collection), zap.String("PartitionKey", partitionKey), zap.Any("Partitions", partitions))

fields, err := ToMilvusFields(srcCollEntity, collCfg)
if err != nil {
Expand Down Expand Up @@ -54,7 +65,17 @@ func ToMilvusParam(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, mil
if err != nil {
return nil, err
}
return &common.CollectionInfo{Param: param, Fields: fields}, err

return &common.CollectionInfo{Param: param, Fields: fields, Partitions: partitions, PartitionKey: partitionKey}, err
}

func getPartitionKey(collEntity *entity.Collection) string {
for _, field := range collEntity.Schema.Fields {
if field.IsPartitionKey {
return field.Name
}
}
return common.EMPTY
}

func GetMilvusConsistencyLevel(collCfg *milvus2xtype.CollectionCfg, collEntity *entity.Collection) (*entity.ConsistencyLevel, error) {
Expand Down
1 change: 1 addition & 0 deletions storage/milvus2x/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Milvus2xVersClient interface {
IterateNext(ctx context.Context) (*Milvus2xData, error)
Close() error
DescCollection(ctx context.Context, collectionName string) (*entity.Collection, error)
ShowPartitions(ctx context.Context, collectionName string) ([]*entity.Partition, error)
}

type Milvus2xData struct {
Expand Down
8 changes: 8 additions & 0 deletions storage/milvus2x/milvus2_3_ver.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ func (milvus23 *Milvus23VerClient) DescCollection(ctx context.Context, collectio
return collEntity, nil
}

func (milvus23 *Milvus23VerClient) ShowPartitions(ctx context.Context, collectionName string) ([]*entity.Partition, error) {
partition, err := milvus23._milvus.ShowPartitions(ctx, collectionName)
if err != nil {
return nil, err
}
return partition, nil
}

// 这里统一给source创建milvus client, 和target区分开
func _createMilvus23VerClient(cfg *config.Milvus2xConfig) (*Milvus23VerClient, error) {

Expand Down

0 comments on commit 3db4700

Please sign in to comment.