From 6d5968b7626c3346bc0861924d582947b1428a65 Mon Sep 17 00:00:00 2001 From: wayblink Date: Tue, 30 Jan 2024 10:31:15 +0800 Subject: [PATCH] Add retry in backup prepare & Fix backup size lost bug (#284) * Add log to print Flush error Signed-off-by: wayblink * Add retry in backup prepare Signed-off-by: wayblink * Fix backup size lost bug Signed-off-by: wayblink --------- Signed-off-by: wayblink --- core/backup_impl_create_backup.go | 19 +++++++------- core/backup_meta.go | 42 +++++++++++++++++++------------ 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/core/backup_impl_create_backup.go b/core/backup_impl_create_backup.go index 6112bae9..9b1d3b36 100644 --- a/core/backup_impl_create_backup.go +++ b/core/backup_impl_create_backup.go @@ -16,6 +16,7 @@ import ( "github.com/zilliztech/milvus-backup/core/proto/backuppb" "github.com/zilliztech/milvus-backup/core/utils" "github.com/zilliztech/milvus-backup/internal/log" + "github.com/zilliztech/milvus-backup/internal/util/retry" ) func (b *BackupContext) CreateBackup(ctx context.Context, request *backuppb.CreateBackupRequest) *backuppb.BackupInfoResponse { @@ -300,7 +301,7 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo StateCode: backuppb.BackupTaskStateCode_BACKUP_INITIAL, StartTime: time.Now().Unix(), CollectionId: completeCollection.ID, - DbName: collection.db, // todo currently db_name is not used in many places + DbName: collection.db, CollectionName: completeCollection.Name, Schema: schema, ShardsNum: completeCollection.ShardNum, @@ -368,7 +369,7 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush))) newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.getMilvusClient().FlushV2(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName(), false) if err != nil { - log.Error(fmt.Sprintf("fail to flush the collection: %s", collectionBackup.GetCollectionName())) + log.Error(fmt.Sprintf("fail to flush the collection: %s", collectionBackup.GetCollectionName()), zap.Error(err)) return err } log.Info("flush segments", @@ -416,6 +417,7 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo // Flush segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName()) if err != nil { + log.Error(fmt.Sprintf("fail to flush the collection: %s", collectionBackup.GetCollectionName()), zap.Error(err)) return err } log.Info("GetPersistentSegmentInfo from milvus", @@ -544,7 +546,9 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup for _, collection := range toBackupCollections { collectionClone := collection job := func(ctx context.Context) error { - err := b.backupCollectionPrepare(ctx, backupInfo, collectionClone, request.GetForce()) + err := retry.Do(ctx, func() error { + return b.backupCollectionPrepare(ctx, backupInfo, collectionClone, request.GetForce()) + }, retry.Sleep(120*time.Second), retry.Attempts(128)) return err } jobId := b.getBackupCollectionWorkerPool().SubmitWithId(job) @@ -578,17 +582,12 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup return backupInfo, err } - var backupSize int64 = 0 - leveledBackupInfo, err := treeToLevel(backupInfo) + _, err := treeToLevel(backupInfo) if err != nil { backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_FAIL backupInfo.ErrorMessage = err.Error() return backupInfo, err } - for _, coll := range leveledBackupInfo.collectionLevel.GetInfos() { - backupSize += coll.GetSize() - } - backupInfo.Size = backupSize backupInfo.EndTime = time.Now().UnixNano() / int64(time.Millisecond) backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_SUCCESS } else { @@ -762,7 +761,7 @@ func (b *BackupContext) fillSegmentBackupInfo(ctx context.Context, segmentBackup log.Debug("insertPath", zap.String("bucket", b.milvusBucketName), zap.String("insertPath", insertPath)) fieldsLogDir, _, err := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, insertPath, false) if len(fieldsLogDir) == 0 { - msg := fmt.Sprint("Get empty input path, but segment should not be empty, %s", insertPath) + msg := fmt.Sprintf("Get empty input path, but segment should not be empty, %s", insertPath) return segmentBackupInfo, errors.New(msg) } if err != nil { diff --git a/core/backup_meta.go b/core/backup_meta.go index ca91ce9a..a437dbe4 100644 --- a/core/backup_meta.go +++ b/core/backup_meta.go @@ -48,8 +48,31 @@ func treeToLevel(backup *backuppb.BackupInfo) (LeveledBackupInfo, error) { collections := make([]*backuppb.CollectionBackupInfo, 0) partitions := make([]*backuppb.PartitionBackupInfo, 0) segments := make([]*backuppb.SegmentBackupInfo, 0) - + // recalculate backup size + var backupSize int64 = 0 for _, collectionBack := range backup.GetCollectionBackups() { + // recalculate backup size + var collectionSize int64 = 0 + for _, partitionBack := range collectionBack.GetPartitionBackups() { + // recalculate backup size + var partitionSize int64 = 0 + for _, segmentBack := range partitionBack.GetSegmentBackups() { + segments = append(segments, segmentBack) + partitionSize = partitionSize + segmentBack.GetSize() + } + partitionBack.Size = partitionSize + clonePartitionBackupInfo := &backuppb.PartitionBackupInfo{ + PartitionId: partitionBack.GetPartitionId(), + PartitionName: partitionBack.GetPartitionName(), + CollectionId: partitionBack.GetCollectionId(), + Size: partitionBack.GetSize(), + LoadState: partitionBack.GetLoadState(), + } + partitions = append(partitions, clonePartitionBackupInfo) + collectionSize = collectionSize + partitionSize + } + + collectionBack.Size = collectionSize cloneCollectionBackup := &backuppb.CollectionBackupInfo{ CollectionId: collectionBack.GetCollectionId(), DbName: collectionBack.GetDbName(), @@ -65,21 +88,7 @@ func treeToLevel(backup *backuppb.BackupInfo) (LeveledBackupInfo, error) { BackupPhysicalTimestamp: collectionBack.GetBackupPhysicalTimestamp(), } collections = append(collections, cloneCollectionBackup) - - for _, partitionBack := range collectionBack.GetPartitionBackups() { - clonePartitionBackupInfo := &backuppb.PartitionBackupInfo{ - PartitionId: partitionBack.GetPartitionId(), - PartitionName: partitionBack.GetPartitionName(), - CollectionId: partitionBack.GetCollectionId(), - Size: partitionBack.GetSize(), - LoadState: partitionBack.GetLoadState(), - } - partitions = append(partitions, clonePartitionBackupInfo) - - for _, segmentBack := range partitionBack.GetSegmentBackups() { - segments = append(segments, segmentBack) - } - } + backupSize = backupSize + collectionSize } collectionLevel := &backuppb.CollectionLevelBackupInfo{ @@ -91,6 +100,7 @@ func treeToLevel(backup *backuppb.BackupInfo) (LeveledBackupInfo, error) { segmentLevel := &backuppb.SegmentLevelBackupInfo{ Infos: segments, } + backup.Size = backupSize backupLevel := &backuppb.BackupInfo{ Id: backup.GetId(), StateCode: backup.GetStateCode(),