diff --git a/core/common/constant.go b/core/common/constant.go index 7b674de..c805f5e 100644 --- a/core/common/constant.go +++ b/core/common/constant.go @@ -62,3 +62,4 @@ const SUB_FILE_SIZE = 1024 * 1024 * 300 const UPSERT = "upsert" const MILVUS_META_FD = "$meta" +const DEFAULT_PARTITION_NAME = "_default" diff --git a/core/common/param.go b/core/common/param.go index f524809..fe82dde 100644 --- a/core/common/param.go +++ b/core/common/param.go @@ -27,6 +27,8 @@ type CollectionParam struct { FileMapKey string } type CollectionInfo struct { - Param *CollectionParam - Fields []*entity.Field + Param *CollectionParam + Fields []*entity.Field + Partitions []*entity.Partition + PartitionKey string } diff --git a/core/dbclient/cus_field_milvus2x.go b/core/dbclient/cus_field_milvus2x.go index b783fd2..4263f89 100644 --- a/core/dbclient/cus_field_milvus2x.go +++ b/core/dbclient/cus_field_milvus2x.go @@ -56,6 +56,8 @@ func (cus *CustomFieldMilvus2x) createCollection(ctx context.Context, collection zap.Any("fields", collectionInfo.Fields), zap.Bool("dynamicField", collectionInfo.Param.EnableDynamicField), zap.Bool("autoId", collectionInfo.Param.AutoId), + zap.String("partitionKey", collectionInfo.PartitionKey), + zap.Any("partitions", collectionInfo.Partitions), zap.String("description", collectionInfo.Param.Description)) // schema schema := &entity.Schema{ @@ -77,6 +79,19 @@ func (cus *CustomFieldMilvus2x) createCollection(ctx context.Context, collection zap.String("collection", collectionInfo.Param.CollectionName), zap.Error(err)) return err } + if collectionInfo.PartitionKey == "" && collectionInfo.Partitions != nil { + for _, partition := range collectionInfo.Partitions { + if partition.Name == common.DEFAULT_PARTITION_NAME { + continue + } + err := cus.Milvus2x.milvus.CreatePartition(ctx, collectionInfo.Param.CollectionName, partition.Name) + if err != nil { + log.Error("Create custom field milvus2x Collection Partition error", zap.String("collection", collectionInfo.Param.CollectionName), + zap.String("partitionName", partition.Name), zap.Error(err)) + return err + } + } + } return nil } diff --git a/core/transform/milvus2x/convert/milvus2x_convert.go b/core/transform/milvus2x/convert/milvus2x_convert.go index 3b6fbd2..cf8e30f 100644 --- a/core/transform/milvus2x/convert/milvus2x_convert.go +++ b/core/transform/milvus2x/convert/milvus2x_convert.go @@ -21,8 +21,19 @@ func ToMilvusParam(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, mil //当source是开启动态列表,并且 target也打开动态列属性,则动态列需要迁移(DynamicField=true) collCfg.DynamicField = srcCollEntity.Schema.EnableDynamicField && !collCfg.MilvusCfg.CloseDynamicField + //partition key + partitionKey := getPartitionKey(srcCollEntity) + //partition + var partitions []*entity.Partition = nil + if partitionKey == "" { + partitions, err = milvus2xCli.VerCli.ShowPartitions(ctx, collCfg.Collection) + if err != nil { + return nil, err + } + } + log.Info("milvus2x source collection_schema", zap.Bool("DynamicFieldStatus", collCfg.DynamicField), - zap.String("Collection", collCfg.Collection)) + zap.String("Collection", collCfg.Collection), zap.String("PartitionKey", partitionKey), zap.Any("Partitions", partitions)) fields, err := ToMilvusFields(srcCollEntity, collCfg) if err != nil { @@ -54,7 +65,17 @@ func ToMilvusParam(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, mil if err != nil { return nil, err } - return &common.CollectionInfo{Param: param, Fields: fields}, err + + return &common.CollectionInfo{Param: param, Fields: fields, Partitions: partitions, PartitionKey: partitionKey}, err +} + +func getPartitionKey(collEntity *entity.Collection) string { + for _, field := range collEntity.Schema.Fields { + if field.IsPartitionKey { + return field.Name + } + } + return common.EMPTY } func GetMilvusConsistencyLevel(collCfg *milvus2xtype.CollectionCfg, collEntity *entity.Collection) (*entity.ConsistencyLevel, error) { diff --git a/storage/milvus2x/base.go b/storage/milvus2x/base.go index 7a7736f..286a7bf 100644 --- a/storage/milvus2x/base.go +++ b/storage/milvus2x/base.go @@ -16,6 +16,7 @@ type Milvus2xVersClient interface { IterateNext(ctx context.Context) (*Milvus2xData, error) Close() error DescCollection(ctx context.Context, collectionName string) (*entity.Collection, error) + ShowPartitions(ctx context.Context, collectionName string) ([]*entity.Partition, error) } type Milvus2xData struct { diff --git a/storage/milvus2x/milvus2_3_ver.go b/storage/milvus2x/milvus2_3_ver.go index e27898d..f03da3e 100644 --- a/storage/milvus2x/milvus2_3_ver.go +++ b/storage/milvus2x/milvus2_3_ver.go @@ -134,6 +134,14 @@ func (milvus23 *Milvus23VerClient) DescCollection(ctx context.Context, collectio return collEntity, nil } +func (milvus23 *Milvus23VerClient) ShowPartitions(ctx context.Context, collectionName string) ([]*entity.Partition, error) { + partition, err := milvus23._milvus.ShowPartitions(ctx, collectionName) + if err != nil { + return nil, err + } + return partition, nil +} + // 这里统一给source创建milvus client, 和target区分开 func _createMilvus23VerClient(cfg *config.Milvus2xConfig) (*Milvus23VerClient, error) {