Skip to content
This repository has been archived by the owner on Nov 13, 2024. It is now read-only.

support migration collection partions #102

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/common/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,4 @@ const SUB_FILE_SIZE = 1024 * 1024 * 300

const UPSERT = "upsert"
const MILVUS_META_FD = "$meta"
const DEFAULT_PARTITION_NAME = "_default"
6 changes: 4 additions & 2 deletions core/common/param.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type CollectionParam struct {
FileMapKey string
}
type CollectionInfo struct {
Param *CollectionParam
Fields []*entity.Field
Param *CollectionParam
Fields []*entity.Field
Partitions []*entity.Partition
PartitionKey string
}
15 changes: 15 additions & 0 deletions core/dbclient/cus_field_milvus2x.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (cus *CustomFieldMilvus2x) createCollection(ctx context.Context, collection
zap.Any("fields", collectionInfo.Fields),
zap.Bool("dynamicField", collectionInfo.Param.EnableDynamicField),
zap.Bool("autoId", collectionInfo.Param.AutoId),
zap.String("partitionKey", collectionInfo.PartitionKey),
zap.Any("partitions", collectionInfo.Partitions),
zap.String("description", collectionInfo.Param.Description))
// schema
schema := &entity.Schema{
Expand All @@ -77,6 +79,19 @@ func (cus *CustomFieldMilvus2x) createCollection(ctx context.Context, collection
zap.String("collection", collectionInfo.Param.CollectionName), zap.Error(err))
return err
}
if collectionInfo.PartitionKey == "" && collectionInfo.Partitions != nil {
for _, partition := range collectionInfo.Partitions {
if partition.Name == common.DEFAULT_PARTITION_NAME {
continue
}
err := cus.Milvus2x.milvus.CreatePartition(ctx, collectionInfo.Param.CollectionName, partition.Name)
if err != nil {
log.Error("Create custom field milvus2x Collection Partition error", zap.String("collection", collectionInfo.Param.CollectionName),
zap.String("partitionName", partition.Name), zap.Error(err))
return err
}
}
}
return nil
}

Expand Down
25 changes: 23 additions & 2 deletions core/transform/milvus2x/convert/milvus2x_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,19 @@ func ToMilvusParam(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, mil
//当source是开启动态列表,并且 target也打开动态列属性,则动态列需要迁移(DynamicField=true)
collCfg.DynamicField = srcCollEntity.Schema.EnableDynamicField && !collCfg.MilvusCfg.CloseDynamicField

//partition key
partitionKey := getPartitionKey(srcCollEntity)
//partition
var partitions []*entity.Partition = nil
if partitionKey == "" {
partitions, err = milvus2xCli.VerCli.ShowPartitions(ctx, collCfg.Collection)
if err != nil {
return nil, err
}
}

log.Info("milvus2x source collection_schema", zap.Bool("DynamicFieldStatus", collCfg.DynamicField),
zap.String("Collection", collCfg.Collection))
zap.String("Collection", collCfg.Collection), zap.String("PartitionKey", partitionKey), zap.Any("Partitions", partitions))

fields, err := ToMilvusFields(srcCollEntity, collCfg)
if err != nil {
Expand Down Expand Up @@ -54,7 +65,17 @@ func ToMilvusParam(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, mil
if err != nil {
return nil, err
}
return &common.CollectionInfo{Param: param, Fields: fields}, err

return &common.CollectionInfo{Param: param, Fields: fields, Partitions: partitions, PartitionKey: partitionKey}, err
}

func getPartitionKey(collEntity *entity.Collection) string {
for _, field := range collEntity.Schema.Fields {
if field.IsPartitionKey {
return field.Name
}
}
return common.EMPTY
}

func GetMilvusConsistencyLevel(collCfg *milvus2xtype.CollectionCfg, collEntity *entity.Collection) (*entity.ConsistencyLevel, error) {
Expand Down
1 change: 1 addition & 0 deletions storage/milvus2x/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Milvus2xVersClient interface {
IterateNext(ctx context.Context) (*Milvus2xData, error)
Close() error
DescCollection(ctx context.Context, collectionName string) (*entity.Collection, error)
ShowPartitions(ctx context.Context, collectionName string) ([]*entity.Partition, error)
}

type Milvus2xData struct {
Expand Down
8 changes: 8 additions & 0 deletions storage/milvus2x/milvus2_3_ver.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ func (milvus23 *Milvus23VerClient) DescCollection(ctx context.Context, collectio
return collEntity, nil
}

func (milvus23 *Milvus23VerClient) ShowPartitions(ctx context.Context, collectionName string) ([]*entity.Partition, error) {
partition, err := milvus23._milvus.ShowPartitions(ctx, collectionName)
if err != nil {
return nil, err
}
return partition, nil
}

// 这里统一给source创建milvus client, 和target区分开
func _createMilvus23VerClient(cfg *config.Milvus2xConfig) (*Milvus23VerClient, error) {

Expand Down
Loading