Skip to content
This repository has been archived by the owner on Nov 13, 2024. It is now read-only.

Commit

Permalink
Merge pull request #103 from zilliztech/feature_support_partition
Browse files Browse the repository at this point in the history
migration data to origin partition
  • Loading branch information
wenhuiZilliz authored Aug 23, 2024
2 parents 3db4700 + 17fca6d commit a515da6
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 49 deletions.
2 changes: 1 addition & 1 deletion core/data/process_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *ProcessHandler) SetLoadTotalSize(totalSize int64) {
}
func (p *ProcessHandler) AddLoadSize(increment int, ctx context.Context) {
p.LoadFinishSize.Add(int64(increment))
log.LL(ctx).Info("=================>JobProcess!", zap.Int("Percent", p.CalcProcess()))
log.LL(ctx).Info("=================>JobProcess!!", zap.Int("Percent", p.CalcProcess()))
}

func (p *ProcessHandler) CalcProcess() int {
Expand Down
2 changes: 1 addition & 1 deletion core/dbclient/cus_field_milvus2x.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (cus *CustomFieldMilvus2x) createCollection(ctx context.Context, collection
}
err := cus.Milvus2x.milvus.CreatePartition(ctx, collectionInfo.Param.CollectionName, partition.Name)
if err != nil {
log.Error("Create custom field milvus2x Collection Partition error", zap.String("collection", collectionInfo.Param.CollectionName),
log.Error("Create custom field milvus2x Collection CurrPartition error", zap.String("collection", collectionInfo.Param.CollectionName),
zap.String("partitionName", partition.Name), zap.Error(err))
return err
}
Expand Down
10 changes: 5 additions & 5 deletions core/dbclient/milvus2x.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (this *Milvus2x) ShowCollectionRows(ctx context.Context, collections []stri

// print or not
if print {
log.LL(ctx).Info("[Milvus2x] Collection Static:", zap.String("collection", col),
log.LL(ctx).Info("[Milvus2x] Target Collection Static:", zap.String("collection", col),
zap.Int("rowCount", count))
}
}
Expand Down Expand Up @@ -273,21 +273,21 @@ func (this *Milvus2x) WaitBulkLoadSuccess(ctx context.Context, taskId int64) err
}

func (this *Milvus2x) StartBatchInsert(ctx context.Context, collection string, data *milvus2x.Milvus2xData) error {
_, err := this.milvus.Insert(ctx, collection, "", data.Columns...)
_, err := this.milvus.Insert(ctx, collection, data.Partition, data.Columns...)
if err != nil {
log.L().Info("[Loader] BatchInsert return err", zap.Error(err))
return err
}
log.LL(ctx).Info("[Loader] success to BatchInsert to Milvus", zap.String("col", collection))
log.LL(ctx).Info("[Loader] success to BatchInsert to Milvus", zap.String("col", collection), zap.String("partition", data.Partition))
return nil
}

func (this *Milvus2x) StartBatchUpsert(ctx context.Context, collection string, data *milvus2x.Milvus2xData) error {
_, err := this.milvus.Upsert(ctx, collection, "", data.Columns...)
_, err := this.milvus.Upsert(ctx, collection, data.Partition, data.Columns...)
if err != nil {
log.L().Info("[Loader] BatchUpsert return err", zap.Error(err))
return err
}
log.LL(ctx).Info("[Loader] success to BatchUpsert to Milvus", zap.String("col", collection))
log.LL(ctx).Info("[Loader] success to BatchUpsert to Milvus", zap.String("col", collection), zap.String("partition", data.Partition))
return nil
}
73 changes: 67 additions & 6 deletions core/dumper/mode_starter_milvus2x.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dumper

import (
"context"
"github.com/zilliztech/milvus-migration/core/common"
"github.com/zilliztech/milvus-migration/core/gstore"
"github.com/zilliztech/milvus-migration/core/meta"
"github.com/zilliztech/milvus-migration/core/reader/source"
Expand Down Expand Up @@ -42,18 +43,80 @@ func (dp *Dumper) WorkInMilvus2x(ctx context.Context, collCfg *milvus2xtype.Coll
func (dp *Dumper) ReadData2Channel(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, dataChannel chan *milvus2x.Milvus2xData) error {

source := source.NewMilvus2xSource(collCfg, dp.cfg, dataChannel)
data, err := source.ReadFirst(ctx)

count, err := source.Count(ctx, collCfg)
if err != nil {
return err
}

collCfg.Rows = count
//设置进度相关信息:dump & load 总数量
gstore.GetProcessHandler(dp.jobId).SetDumpTotalSize(collCfg.Rows)
gstore.GetProcessHandler(dp.jobId).SetLoadTotalSize(collCfg.Rows)

partitionNames := getPartitionNames(collCfg)
fieldNames := getIteratorFields(collCfg)
source.FieldNames = fieldNames
log.Info("start iterator milvus collection", zap.String("collection", collCfg.Collection),
zap.Int("BatchSize", source.BatchSize), zap.Int64("CollectionRow", count), zap.Any("PartitionName", partitionNames))
log.Info("start iterator milvus collection", zap.Any("migration fieldName", fieldNames))
log.Info("start iterator milvus collection", zap.Any("migration milvusCfg", collCfg.MilvusCfg))
log.Info("start iterator milvus collection", zap.Any("migration fields", collCfg.Fields))

for _, partition := range partitionNames {
source.CurrPartition = partition
dataIsEmpty, err := dp.readFirstData(ctx, source)
if err != nil {
return err
}
if dataIsEmpty {
continue
}
err = dp.LoopReadData(ctx, source)
if err != nil {
return err
}
}
return source.Close()
}

func getIteratorFields(collCfg *milvus2xtype.CollectionCfg) []string {
fieldNames := make([]string, 0, len(collCfg.Fields))
for _, fieldCfg := range collCfg.Fields {
if collCfg.MilvusCfg.AutoId == "true" && fieldCfg.PK {
continue
}
fieldNames = append(fieldNames, fieldCfg.Name)
}
if collCfg.DynamicField {
fieldNames = append(fieldNames, common.MILVUS_META_FD) //把source 动态列也查出来
}
return fieldNames
}

func getPartitionNames(collCfg *milvus2xtype.CollectionCfg) []string {
partitionNames := make([]string, 0)
if collCfg.Partitions == nil {
partitionNames = append(partitionNames, common.EMPTY)
} else {
for _, p := range collCfg.Partitions {
partitionNames = append(partitionNames, p.Name)
}
}
return partitionNames
}

func (dp *Dumper) readFirstData(ctx context.Context, source *source.Milvus2xSource) (bool, error) {
data, err := source.ReadFirst(ctx)
if err != nil {
return false, err
}
//某个partition数据为空,返回true
if data == nil {
return true, nil
}
//已完成dump数量
gstore.GetProcessHandler(dp.jobId).AddDumpedSize(data.Columns[0].Len(), ctx)

return dp.LoopReadData(ctx, source)
return false, nil
}

func (dp *Dumper) LoopReadData(ctx context.Context, source *source.Milvus2xSource) error {
Expand All @@ -64,12 +127,10 @@ func (dp *Dumper) LoopReadData(ctx context.Context, source *source.Milvus2xSourc
for !data.IsEmpty {

gstore.GetProcessHandler(dp.jobId).AddDumpedSize(data.Columns[0].Len(), ctx)

data, err = source.ReadNext(ctx)
if err != nil {
return err
}
}
source.Close()
return nil
}
4 changes: 3 additions & 1 deletion core/loader/cus_milvus2x_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ func (this *CustomMilvus2xLoader) compareResult(ctx context.Context) error {

func (this *CustomMilvus2xLoader) BatchWrite(ctx context.Context, data *milvus2x.Milvus2xData) error {

log.LL(ctx).Info("[Loader] Begin to batchWrite data to milvus", zap.String("collection", this.runtimeCollectionNames[0]))
log.LL(ctx).Info("[Loader] Begin to batchWrite data to milvus", zap.String("collection",
this.runtimeCollectionNames[0]), zap.String("partition", data.Partition))

if this.cfg.TargetMilvus2xCfg.WriteMode == common.UPSERT {
return this.CusMilvus2x.StartBatchUpsert(ctx, this.runtimeCollectionNames[0], data)
} else {
Expand Down
35 changes: 25 additions & 10 deletions core/reader/source/milvus2x_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package source

import (
"context"
"errors"
"github.com/zilliztech/milvus-migration/core/config"
"github.com/zilliztech/milvus-migration/core/factory/milvus2x_factory"
"github.com/zilliztech/milvus-migration/core/type/milvus2xtype"
Expand All @@ -14,12 +13,14 @@ import (
var DefaultSize = 100

type Milvus2xSource struct {
Cfg *config.Milvus2xConfig
CollCfg *milvus2xtype.CollectionCfg
Cli *milvus2x.Milvus2xClient
ScrollId string
BatchSize int
DataChannel chan *milvus2x.Milvus2xData
Cfg *config.Milvus2xConfig
CollCfg *milvus2xtype.CollectionCfg
Cli *milvus2x.Milvus2xClient
ScrollId string
BatchSize int
DataChannel chan *milvus2x.Milvus2xData
CurrPartition string
FieldNames []string
}

func NewMilvus2xSource(collCfg *milvus2xtype.CollectionCfg, dpCfg *config.MigrationConfig, dataChannel chan *milvus2x.Milvus2xData) *Milvus2xSource {
Expand All @@ -39,7 +40,8 @@ func NewMilvus2xSource(collCfg *milvus2xtype.CollectionCfg, dpCfg *config.Migrat
}

func (milvus2xSource *Milvus2xSource) ReadFirst(ctx context.Context) (*milvus2x.Milvus2xData, error) {
err := milvus2xSource.Cli.VerCli.InitIterator(ctx, milvus2xSource.CollCfg, milvus2xSource.BatchSize)
err := milvus2xSource.Cli.VerCli.InitIterator(ctx, milvus2xSource.CollCfg,
milvus2xSource.BatchSize, milvus2xSource.CurrPartition, milvus2xSource.FieldNames)
if err != nil {
return nil, err
}
Expand All @@ -48,10 +50,14 @@ func (milvus2xSource *Milvus2xSource) ReadFirst(ctx context.Context) (*milvus2x.
return nil, err
}
if data.IsEmpty {
return nil, errors.New("milvus2x collection data is empty")
log.Info("milvus2x collection partition data is empty", zap.Any("Partition", milvus2xSource.CurrPartition))
return nil, nil
}
log.Info("milvus2x dumpMilvusData", zap.Any("columnCount", len(data.Columns)))
log.Info("milvus2x dumpMilvusData", zap.Any("columnCount", len(data.Columns)),
zap.Any("Partition", milvus2xSource.CurrPartition))

milvus2xSource.removePKColIfOpenAutoId(data)
data.Partition = milvus2xSource.CurrPartition
milvus2xSource.DataChannel <- data
return data, nil
}
Expand All @@ -74,6 +80,7 @@ func (milvus2xSource *Milvus2xSource) ReadNext(ctx context.Context) (*milvus2x.M
}
if !data.IsEmpty {
milvus2xSource.removePKColIfOpenAutoId(data)
data.Partition = milvus2xSource.CurrPartition
milvus2xSource.DataChannel <- data
}
return data, nil
Expand All @@ -88,3 +95,11 @@ func (milvus2xSource *Milvus2xSource) Close() error {
//close(milvus2xSource.DataChannel)
return nil
}

func (milvus2xSource *Milvus2xSource) Count(ctx context.Context, cfg *milvus2xtype.CollectionCfg) (int64, error) {
count, err := milvus2xSource.Cli.VerCli.Count(ctx, cfg)
if err != nil {
return 0, err
}
return count, nil
}
1 change: 1 addition & 0 deletions core/transform/milvus2x/convert/milvus2x_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func ToMilvusParam(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, mil
if err != nil {
return nil, err
}
collCfg.Partitions = partitions
}

log.Info("milvus2x source collection_schema", zap.Bool("DynamicFieldStatus", collCfg.DynamicField),
Expand Down
2 changes: 2 additions & 0 deletions core/type/milvus2xtype/mlv2x_type.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package milvus2xtype

import (
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"github.com/zilliztech/milvus-migration/core/type/milvustype"
)

Expand All @@ -15,6 +16,7 @@ type CollectionCfg struct {
Fields []FieldCfg `json:"fields"`
MilvusCfg *milvustype.MilvusCfg `json:"milvus"`

Partitions []*entity.Partition
DynamicField bool //source collection Dynamic Field status, if it open, will sync $meta field data to target collection
}

Expand Down
8 changes: 5 additions & 3 deletions storage/milvus2x/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ import (
const VER_2_3 = "2.3"

type Milvus2xVersClient interface {
InitIterator(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, batchSize int) error
Count(ctx context.Context, collCfg *milvus2xtype.CollectionCfg) (int64, error)
InitIterator(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, batchSize int, partition string, fieldNames []string) error
IterateNext(ctx context.Context) (*Milvus2xData, error)
Close() error
DescCollection(ctx context.Context, collectionName string) (*entity.Collection, error)
ShowPartitions(ctx context.Context, collectionName string) ([]*entity.Partition, error)
}

type Milvus2xData struct {
Columns []entity.Column
IsEmpty bool
Columns []entity.Column
IsEmpty bool
Partition string
}

type Milvus2xClient struct {
Expand Down
30 changes: 8 additions & 22 deletions storage/milvus2x/milvus2_3_ver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,17 @@ func NewMilvus23VerCli(Milvus2xConfig *config.Milvus2xConfig) (Milvus2xVersClien
return verCli, nil
}

func (milvus23 *Milvus23VerClient) InitIterator(ctx context.Context, collCfg *milvus2xtype.CollectionCfg, batchSize int) error {

count, err := milvus23.Count(ctx, collCfg)
if err != nil {
return err
}
collCfg.Rows = count
func (milvus23 *Milvus23VerClient) InitIterator(ctx context.Context, collCfg *milvus2xtype.CollectionCfg,
batchSize int, partition string, fieldNames []string) error {

log.Info("start iterator milvus collection", zap.String("collection", collCfg.Collection),
zap.Int("BatchSize", batchSize), zap.Int64("CollectionRow", count))
fieldNames := make([]string, 0, len(collCfg.Fields))
for _, fieldCfg := range collCfg.Fields {
if collCfg.MilvusCfg.AutoId == "true" && fieldCfg.PK {
continue
}
fieldNames = append(fieldNames, fieldCfg.Name)
}
if collCfg.DynamicField {
fieldNames = append(fieldNames, common.MILVUS_META_FD) //把source 动态列也查出来
zap.Int("BatchSize", batchSize), zap.String("CurrPartition", partition))
var iteratorParam *client.QueryIteratorOption
if partition != common.EMPTY {
iteratorParam = client.NewQueryIteratorOption(collCfg.Collection).WithBatchSize(batchSize).WithExpr(common.EMPTY).WithPartitions(partition).WithOutputFields(fieldNames...)
} else {
iteratorParam = client.NewQueryIteratorOption(collCfg.Collection).WithBatchSize(batchSize).WithExpr(common.EMPTY).WithOutputFields(fieldNames...)
}

log.Info("start iterator milvus collection", zap.Any("migration fieldName", fieldNames))
log.Info("start iterator milvus collection", zap.Any("migration milvusCfg", collCfg.MilvusCfg))
log.Info("start iterator milvus collection", zap.Any("migration fields", collCfg.Fields))
iteratorParam := client.NewQueryIteratorOption(collCfg.Collection).WithBatchSize(batchSize).WithExpr(common.EMPTY).WithOutputFields(fieldNames...)
//iteratorParam := client.NewQueryIteratorOption(collCfg.Collection).WithBatchSize(batchSize).WithExpr(common.EMPTY).WithOutputFields("*")
iterator, err := milvus23._milvus.QueryIterator(ctx, iteratorParam)
if err != nil {
Expand Down

0 comments on commit a515da6

Please sign in to comment.