From 17fca6d65cebf12121abcc151f5dc61268b2ad46 Mon Sep 17 00:00:00 2001 From: "wenhui.zhang" Date: Fri, 23 Aug 2024 19:55:50 +0800 Subject: [PATCH] migration data to origin partition --- core/data/process_info.go | 2 +- core/dbclient/cus_field_milvus2x.go | 2 +- core/dbclient/milvus2x.go | 10 +-- core/dumper/mode_starter_milvus2x.go | 73 +++++++++++++++++-- core/loader/cus_milvus2x_loader.go | 4 +- core/reader/source/milvus2x_source.go | 35 ++++++--- .../milvus2x/convert/milvus2x_convert.go | 1 + core/type/milvus2xtype/mlv2x_type.go | 2 + storage/milvus2x/base.go | 8 +- storage/milvus2x/milvus2_3_ver.go | 30 ++------ 10 files changed, 118 insertions(+), 49 deletions(-) diff --git a/core/data/process_info.go b/core/data/process_info.go index 37a8fdb..3210bff 100644 --- a/core/data/process_info.go +++ b/core/data/process_info.go @@ -66,7 +66,7 @@ func (p *ProcessHandler) SetLoadTotalSize(totalSize int64) { } func (p *ProcessHandler) AddLoadSize(increment int, ctx context.Context) { p.LoadFinishSize.Add(int64(increment)) - log.LL(ctx).Info("=================>JobProcess!", zap.Int("Percent", p.CalcProcess())) + log.LL(ctx).Info("=================>JobProcess!!", zap.Int("Percent", p.CalcProcess())) } func (p *ProcessHandler) CalcProcess() int { diff --git a/core/dbclient/cus_field_milvus2x.go b/core/dbclient/cus_field_milvus2x.go index 4263f89..ad80de5 100644 --- a/core/dbclient/cus_field_milvus2x.go +++ b/core/dbclient/cus_field_milvus2x.go @@ -86,7 +86,7 @@ func (cus *CustomFieldMilvus2x) createCollection(ctx context.Context, collection } err := cus.Milvus2x.milvus.CreatePartition(ctx, collectionInfo.Param.CollectionName, partition.Name) if err != nil { - log.Error("Create custom field milvus2x Collection Partition error", zap.String("collection", collectionInfo.Param.CollectionName), + log.Error("Create custom field milvus2x Collection CurrPartition error", zap.String("collection", collectionInfo.Param.CollectionName), zap.String("partitionName", partition.Name), zap.Error(err)) return err } diff --git a/core/dbclient/milvus2x.go b/core/dbclient/milvus2x.go index adde244..dae916b 100644 --- a/core/dbclient/milvus2x.go +++ b/core/dbclient/milvus2x.go @@ -203,7 +203,7 @@ func (this *Milvus2x) ShowCollectionRows(ctx context.Context, collections []stri // print or not if print { - log.LL(ctx).Info("[Milvus2x] Collection Static:", zap.String("collection", col), + log.LL(ctx).Info("[Milvus2x] Target Collection Static:", zap.String("collection", col), zap.Int("rowCount", count)) } } @@ -273,21 +273,21 @@ func (this *Milvus2x) WaitBulkLoadSuccess(ctx context.Context, taskId int64) err } func (this *Milvus2x) StartBatchInsert(ctx context.Context, collection string, data *milvus2x.Milvus2xData) error { - _, err := this.milvus.Insert(ctx, collection, "", data.Columns...) + _, err := this.milvus.Insert(ctx, collection, data.Partition, data.Columns...) if err != nil { log.L().Info("[Loader] BatchInsert return err", zap.Error(err)) return err } - log.LL(ctx).Info("[Loader] success to BatchInsert to Milvus", zap.String("col", collection)) + log.LL(ctx).Info("[Loader] success to BatchInsert to Milvus", zap.String("col", collection), zap.String("partition", data.Partition)) return nil } func (this *Milvus2x) StartBatchUpsert(ctx context.Context, collection string, data *milvus2x.Milvus2xData) error { - _, err := this.milvus.Upsert(ctx, collection, "", data.Columns...) + _, err := this.milvus.Upsert(ctx, collection, data.Partition, data.Columns...) if err != nil { log.L().Info("[Loader] BatchUpsert return err", zap.Error(err)) return err } - log.LL(ctx).Info("[Loader] success to BatchUpsert to Milvus", zap.String("col", collection)) + log.LL(ctx).Info("[Loader] success to BatchUpsert to Milvus", zap.String("col", collection), zap.String("partition", data.Partition)) return nil } diff --git a/core/dumper/mode_starter_milvus2x.go b/core/dumper/mode_starter_milvus2x.go index cc11f87..40edd42 100644 --- a/core/dumper/mode_starter_milvus2x.go +++ b/core/dumper/mode_starter_milvus2x.go @@ -2,6 +2,7 @@ package dumper import ( "context" + "github.com/zilliztech/milvus-migration/core/common" "github.com/zilliztech/milvus-migration/core/gstore" "github.com/zilliztech/milvus-migration/core/meta" "github.com/zilliztech/milvus-migration/core/reader/source" @@ -42,18 +43,80 @@ func (dp *Dumper) WorkInMilvus2x(ctx context.Context, collCfg *milvus2xtype.Coll func (dp *Dumper) ReadData2Channel(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, dataChannel chan *milvus2x.Milvus2xData) error { source := source.NewMilvus2xSource(collCfg, dp.cfg, dataChannel) - data, err := source.ReadFirst(ctx) + + count, err := source.Count(ctx, collCfg) if err != nil { return err } - + collCfg.Rows = count //设置进度相关信息:dump & load 总数量 gstore.GetProcessHandler(dp.jobId).SetDumpTotalSize(collCfg.Rows) gstore.GetProcessHandler(dp.jobId).SetLoadTotalSize(collCfg.Rows) + + partitionNames := getPartitionNames(collCfg) + fieldNames := getIteratorFields(collCfg) + source.FieldNames = fieldNames + log.Info("start iterator milvus collection", zap.String("collection", collCfg.Collection), + zap.Int("BatchSize", source.BatchSize), zap.Int64("CollectionRow", count), zap.Any("PartitionName", partitionNames)) + log.Info("start iterator milvus collection", zap.Any("migration fieldName", fieldNames)) + log.Info("start iterator milvus collection", zap.Any("migration milvusCfg", collCfg.MilvusCfg)) + log.Info("start iterator milvus collection", zap.Any("migration fields", collCfg.Fields)) + + for _, partition := range partitionNames { + source.CurrPartition = partition + dataIsEmpty, err := dp.readFirstData(ctx, source) + if err != nil { + return err + } + if dataIsEmpty { + continue + } + err = dp.LoopReadData(ctx, source) + if err != nil { + return err + } + } + return source.Close() +} + +func getIteratorFields(collCfg *milvus2xtype.CollectionCfg) []string { + fieldNames := make([]string, 0, len(collCfg.Fields)) + for _, fieldCfg := range collCfg.Fields { + if collCfg.MilvusCfg.AutoId == "true" && fieldCfg.PK { + continue + } + fieldNames = append(fieldNames, fieldCfg.Name) + } + if collCfg.DynamicField { + fieldNames = append(fieldNames, common.MILVUS_META_FD) //把source 动态列也查出来 + } + return fieldNames +} + +func getPartitionNames(collCfg *milvus2xtype.CollectionCfg) []string { + partitionNames := make([]string, 0) + if collCfg.Partitions == nil { + partitionNames = append(partitionNames, common.EMPTY) + } else { + for _, p := range collCfg.Partitions { + partitionNames = append(partitionNames, p.Name) + } + } + return partitionNames +} + +func (dp *Dumper) readFirstData(ctx context.Context, source *source.Milvus2xSource) (bool, error) { + data, err := source.ReadFirst(ctx) + if err != nil { + return false, err + } + //某个partition数据为空,返回true + if data == nil { + return true, nil + } //已完成dump数量 gstore.GetProcessHandler(dp.jobId).AddDumpedSize(data.Columns[0].Len(), ctx) - - return dp.LoopReadData(ctx, source) + return false, nil } func (dp *Dumper) LoopReadData(ctx context.Context, source *source.Milvus2xSource) error { @@ -64,12 +127,10 @@ func (dp *Dumper) LoopReadData(ctx context.Context, source *source.Milvus2xSourc for !data.IsEmpty { gstore.GetProcessHandler(dp.jobId).AddDumpedSize(data.Columns[0].Len(), ctx) - data, err = source.ReadNext(ctx) if err != nil { return err } } - source.Close() return nil } diff --git a/core/loader/cus_milvus2x_loader.go b/core/loader/cus_milvus2x_loader.go index 5fdc8ab..56bb113 100644 --- a/core/loader/cus_milvus2x_loader.go +++ b/core/loader/cus_milvus2x_loader.go @@ -123,7 +123,9 @@ func (this *CustomMilvus2xLoader) compareResult(ctx context.Context) error { func (this *CustomMilvus2xLoader) BatchWrite(ctx context.Context, data *milvus2x.Milvus2xData) error { - log.LL(ctx).Info("[Loader] Begin to batchWrite data to milvus", zap.String("collection", this.runtimeCollectionNames[0])) + log.LL(ctx).Info("[Loader] Begin to batchWrite data to milvus", zap.String("collection", + this.runtimeCollectionNames[0]), zap.String("partition", data.Partition)) + if this.cfg.TargetMilvus2xCfg.WriteMode == common.UPSERT { return this.CusMilvus2x.StartBatchUpsert(ctx, this.runtimeCollectionNames[0], data) } else { diff --git a/core/reader/source/milvus2x_source.go b/core/reader/source/milvus2x_source.go index 913e584..6144f61 100644 --- a/core/reader/source/milvus2x_source.go +++ b/core/reader/source/milvus2x_source.go @@ -2,7 +2,6 @@ package source import ( "context" - "errors" "github.com/zilliztech/milvus-migration/core/config" "github.com/zilliztech/milvus-migration/core/factory/milvus2x_factory" "github.com/zilliztech/milvus-migration/core/type/milvus2xtype" @@ -14,12 +13,14 @@ import ( var DefaultSize = 100 type Milvus2xSource struct { - Cfg *config.Milvus2xConfig - CollCfg *milvus2xtype.CollectionCfg - Cli *milvus2x.Milvus2xClient - ScrollId string - BatchSize int - DataChannel chan *milvus2x.Milvus2xData + Cfg *config.Milvus2xConfig + CollCfg *milvus2xtype.CollectionCfg + Cli *milvus2x.Milvus2xClient + ScrollId string + BatchSize int + DataChannel chan *milvus2x.Milvus2xData + CurrPartition string + FieldNames []string } func NewMilvus2xSource(collCfg *milvus2xtype.CollectionCfg, dpCfg *config.MigrationConfig, dataChannel chan *milvus2x.Milvus2xData) *Milvus2xSource { @@ -39,7 +40,8 @@ func NewMilvus2xSource(collCfg *milvus2xtype.CollectionCfg, dpCfg *config.Migrat } func (milvus2xSource *Milvus2xSource) ReadFirst(ctx context.Context) (*milvus2x.Milvus2xData, error) { - err := milvus2xSource.Cli.VerCli.InitIterator(ctx, milvus2xSource.CollCfg, milvus2xSource.BatchSize) + err := milvus2xSource.Cli.VerCli.InitIterator(ctx, milvus2xSource.CollCfg, + milvus2xSource.BatchSize, milvus2xSource.CurrPartition, milvus2xSource.FieldNames) if err != nil { return nil, err } @@ -48,10 +50,14 @@ func (milvus2xSource *Milvus2xSource) ReadFirst(ctx context.Context) (*milvus2x. return nil, err } if data.IsEmpty { - return nil, errors.New("milvus2x collection data is empty") + log.Info("milvus2x collection partition data is empty", zap.Any("Partition", milvus2xSource.CurrPartition)) + return nil, nil } - log.Info("milvus2x dumpMilvusData", zap.Any("columnCount", len(data.Columns))) + log.Info("milvus2x dumpMilvusData", zap.Any("columnCount", len(data.Columns)), + zap.Any("Partition", milvus2xSource.CurrPartition)) + milvus2xSource.removePKColIfOpenAutoId(data) + data.Partition = milvus2xSource.CurrPartition milvus2xSource.DataChannel <- data return data, nil } @@ -74,6 +80,7 @@ func (milvus2xSource *Milvus2xSource) ReadNext(ctx context.Context) (*milvus2x.M } if !data.IsEmpty { milvus2xSource.removePKColIfOpenAutoId(data) + data.Partition = milvus2xSource.CurrPartition milvus2xSource.DataChannel <- data } return data, nil @@ -88,3 +95,11 @@ func (milvus2xSource *Milvus2xSource) Close() error { //close(milvus2xSource.DataChannel) return nil } + +func (milvus2xSource *Milvus2xSource) Count(ctx context.Context, cfg *milvus2xtype.CollectionCfg) (int64, error) { + count, err := milvus2xSource.Cli.VerCli.Count(ctx, cfg) + if err != nil { + return 0, err + } + return count, nil +} diff --git a/core/transform/milvus2x/convert/milvus2x_convert.go b/core/transform/milvus2x/convert/milvus2x_convert.go index cf8e30f..de53409 100644 --- a/core/transform/milvus2x/convert/milvus2x_convert.go +++ b/core/transform/milvus2x/convert/milvus2x_convert.go @@ -30,6 +30,7 @@ func ToMilvusParam(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, mil if err != nil { return nil, err } + collCfg.Partitions = partitions } log.Info("milvus2x source collection_schema", zap.Bool("DynamicFieldStatus", collCfg.DynamicField), diff --git a/core/type/milvus2xtype/mlv2x_type.go b/core/type/milvus2xtype/mlv2x_type.go index 9a9825c..88eb2b6 100644 --- a/core/type/milvus2xtype/mlv2x_type.go +++ b/core/type/milvus2xtype/mlv2x_type.go @@ -1,6 +1,7 @@ package milvus2xtype import ( + "github.com/milvus-io/milvus-sdk-go/v2/entity" "github.com/zilliztech/milvus-migration/core/type/milvustype" ) @@ -15,6 +16,7 @@ type CollectionCfg struct { Fields []FieldCfg `json:"fields"` MilvusCfg *milvustype.MilvusCfg `json:"milvus"` + Partitions []*entity.Partition DynamicField bool //source collection Dynamic Field status, if it open, will sync $meta field data to target collection } diff --git a/storage/milvus2x/base.go b/storage/milvus2x/base.go index 286a7bf..0a2895a 100644 --- a/storage/milvus2x/base.go +++ b/storage/milvus2x/base.go @@ -12,7 +12,8 @@ import ( const VER_2_3 = "2.3" type Milvus2xVersClient interface { - InitIterator(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, batchSize int) error + Count(ctx context.Context, collCfg *milvus2xtype.CollectionCfg) (int64, error) + InitIterator(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, batchSize int, partition string, fieldNames []string) error IterateNext(ctx context.Context) (*Milvus2xData, error) Close() error DescCollection(ctx context.Context, collectionName string) (*entity.Collection, error) @@ -20,8 +21,9 @@ type Milvus2xVersClient interface { } type Milvus2xData struct { - Columns []entity.Column - IsEmpty bool + Columns []entity.Column + IsEmpty bool + Partition string } type Milvus2xClient struct { diff --git a/storage/milvus2x/milvus2_3_ver.go b/storage/milvus2x/milvus2_3_ver.go index f03da3e..7bdf0c9 100644 --- a/storage/milvus2x/milvus2_3_ver.go +++ b/storage/milvus2x/milvus2_3_ver.go @@ -34,31 +34,17 @@ func NewMilvus23VerCli(Milvus2xConfig *config.Milvus2xConfig) (Milvus2xVersClien return verCli, nil } -func (milvus23 *Milvus23VerClient) InitIterator(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, batchSize int) error { - - count, err := milvus23.Count(ctx, collCfg) - if err != nil { - return err - } - collCfg.Rows = count +func (milvus23 *Milvus23VerClient) InitIterator(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, + batchSize int, partition string, fieldNames []string) error { log.Info("start iterator milvus collection", zap.String("collection", collCfg.Collection), - zap.Int("BatchSize", batchSize), zap.Int64("CollectionRow", count)) - fieldNames := make([]string, 0, len(collCfg.Fields)) - for _, fieldCfg := range collCfg.Fields { - if collCfg.MilvusCfg.AutoId == "true" && fieldCfg.PK { - continue - } - fieldNames = append(fieldNames, fieldCfg.Name) - } - if collCfg.DynamicField { - fieldNames = append(fieldNames, common.MILVUS_META_FD) //把source 动态列也查出来 + zap.Int("BatchSize", batchSize), zap.String("CurrPartition", partition)) + var iteratorParam *client.QueryIteratorOption + if partition != common.EMPTY { + iteratorParam = client.NewQueryIteratorOption(collCfg.Collection).WithBatchSize(batchSize).WithExpr(common.EMPTY).WithPartitions(partition).WithOutputFields(fieldNames...) + } else { + iteratorParam = client.NewQueryIteratorOption(collCfg.Collection).WithBatchSize(batchSize).WithExpr(common.EMPTY).WithOutputFields(fieldNames...) } - - log.Info("start iterator milvus collection", zap.Any("migration fieldName", fieldNames)) - log.Info("start iterator milvus collection", zap.Any("migration milvusCfg", collCfg.MilvusCfg)) - log.Info("start iterator milvus collection", zap.Any("migration fields", collCfg.Fields)) - iteratorParam := client.NewQueryIteratorOption(collCfg.Collection).WithBatchSize(batchSize).WithExpr(common.EMPTY).WithOutputFields(fieldNames...) //iteratorParam := client.NewQueryIteratorOption(collCfg.Collection).WithBatchSize(batchSize).WithExpr(common.EMPTY).WithOutputFields("*") iterator, err := milvus23._milvus.QueryIterator(ctx, iteratorParam) if err != nil {