Skip to content

Commit

Permalink
Support segment level restore concurrency (#341)
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored May 14, 2024
1 parent 654ddf5 commit db2cee5
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 114 deletions.
2 changes: 1 addition & 1 deletion core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,6 @@ func (b *BackupContext) fillSegmentBackupInfo(ctx context.Context, levelInfo *Le
segmentBackupInfo.Deltalogs = deltaLogs
segmentBackupInfo.Size = size
b.updateSegment(levelInfo, segmentBackupInfo)
log.Info("fill segment info", zap.Int64("segId", segmentBackupInfo.GetSegmentId()), zap.Int64("size", segmentBackupInfo.GetSize()))
log.Debug("fill segment info", zap.Int64("segId", segmentBackupInfo.GetSegmentId()), zap.Int64("size", segmentBackupInfo.GetSize()))
return segmentBackupInfo, nil
}
216 changes: 104 additions & 112 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,20 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
b.cleanRestoreWorkerPool(id)
}()

//go func() {
// ticker := time.NewTicker(5 * time.Second)
// defer ticker.Stop()
// for {
// select {
// case <-ctx.Done():
// log.Info("background checking channels loop quit")
// return
// case <-ticker.C:
// log.Info("restore worker pool", zap.Int32("jobs_num", b.getRestoreWorkerPool(id).JobNum()))
// }
// }
//}()

// 2, initial restoreCollectionTasks
toRestoreCollectionBackups := make([]*backuppb.CollectionBackupInfo, 0)

Expand Down Expand Up @@ -558,101 +572,33 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
}()

jobIds := make([]int64, 0)
for _, partitionBackup := range task.GetCollBackup().GetPartitionBackups() {
partitionBackup2 := partitionBackup
job := func(ctx context.Context) error {
log.Info("start restore partition",
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup2.GetPartitionName()))
_, err := b.restorePartition(ctx, targetDBName, targetCollectionName, partitionBackup2, task, isSameBucket, backupBucketName, backupPath, tempDir)
if err != nil {
log.Error("fail to restore partition",
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup2.GetPartitionName()),
zap.Error(err))
return err
}
log.Info("finish restore partition",
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup2.GetPartitionName()))
return err
}
jobId := b.getRestoreWorkerPool(parentTaskID).SubmitWithId(job)
jobIds = append(jobIds, jobId)
}

err := b.getRestoreWorkerPool(parentTaskID).WaitJobs(jobIds)
return task, err
}

func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targetCollectionName string,
partitionBackup *backuppb.PartitionBackupInfo, task *backuppb.RestoreCollectionTask, isSameBucket bool, backupBucketName string, backupPath string, tempDir string) (*backuppb.RestoreCollectionTask, error) {
exist, err := b.getMilvusClient().HasPartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName())
if err != nil {
log.Error("fail to check has partition", zap.Error(err))
return task, err
}
if !exist {
err = retry.Do(ctx, func() error {
return b.getMilvusClient().CreatePartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName())
}, retry.Attempts(10), retry.Sleep(1*time.Second))
for _, v := range task.GetCollBackup().GetPartitionBackups() {
partitionBackup := v
log.Info("start restore partition",
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
// pre-check whether partition exist, if not create it
exist, err := b.getMilvusClient().HasPartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName())
if err != nil {
log.Error("fail to create partition", zap.Error(err))
log.Error("fail to check has partition", zap.Error(err))
return task, err
}
}
log.Info("create partition",
zap.String("collectionName", targetCollectionName),
zap.String("partitionName", partitionBackup.GetPartitionName()))

// bulk insert
copyAndBulkInsert := func(files []string) error {
realFiles := make([]string, len(files))
// if milvus bucket and backup bucket are not the same, should copy the data first
if !isSameBucket {
log.Info("milvus bucket and backup bucket are not the same, copy the data first", zap.Strings("files", files))
for i, file := range files {
// empty delta file, no need to copy
if file == "" {
realFiles[i] = file
} else {
log.Debug("Copy temporary restore file", zap.String("from", file), zap.String("to", tempDir+file))
err = retry.Do(ctx, func() error {
return b.getStorageClient().Copy(ctx, backupBucketName, b.milvusBucketName, file, tempDir+file)
}, retry.Sleep(2*time.Second), retry.Attempts(5))
if err != nil {
log.Error("fail to copy backup date from backup bucket to restore target milvus bucket after retry", zap.Error(err))
return err
}
realFiles[i] = tempDir + file
}
if !exist {
err = retry.Do(ctx, func() error {
return b.getMilvusClient().CreatePartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName())
}, retry.Attempts(10), retry.Sleep(1*time.Second))
if err != nil {
log.Error("fail to create partition", zap.Error(err))
return task, err
}
} else {
realFiles = files
log.Info("create partition",
zap.String("collectionName", targetCollectionName),
zap.String("partitionName", partitionBackup.GetPartitionName()))
}

err = b.executeBulkInsert(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName(), realFiles, int64(task.GetCollBackup().BackupTimestamp))
if err != nil {
log.Error("fail to bulk insert to partition",
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()),
zap.Error(err))
return err
}
return nil
}

if task.GetMetaOnly() {
task.Progress = 100
} else {
restoreFileGroups := make([][]string, 0)
groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups())
if len(groupIds) == 1 && groupIds[0] == 0 {
// backward compatible old backup without group id
Expand All @@ -665,15 +611,7 @@ func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targ
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
err = copyAndBulkInsert(files)
if err != nil {
log.Error("fail to (copy and) bulkinsert data",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
restoreFileGroups = append(restoreFileGroups, files)
} else {
// bulk insert by segment groups
for _, groupId := range groupIds {
Expand All @@ -686,27 +624,81 @@ func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targ
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
err = copyAndBulkInsert(files)
if err != nil {
log.Error("fail to (copy and) bulkinsert data",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
restoreFileGroups = append(restoreFileGroups, files)
}
}

// bulk insert
copyAndBulkInsert := func(files []string) error {
realFiles := make([]string, len(files))
// if milvus bucket and backup bucket are not the same, should copy the data first
if !isSameBucket {
log.Info("milvus bucket and backup bucket are not the same, copy the data first", zap.Strings("files", files))
for i, file := range files {
// empty delta file, no need to copy
if file == "" {
realFiles[i] = file
} else {
log.Debug("Copy temporary restore file", zap.String("from", file), zap.String("to", tempDir+file))
err := retry.Do(ctx, func() error {
return b.getStorageClient().Copy(ctx, backupBucketName, b.milvusBucketName, file, tempDir+file)
}, retry.Sleep(2*time.Second), retry.Attempts(5))
if err != nil {
log.Error("fail to copy backup date from backup bucket to restore target milvus bucket after retry", zap.Error(err))
return err
}
realFiles[i] = tempDir + file
}
}
} else {
realFiles = files
}

err := b.executeBulkInsert(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName(), realFiles, int64(task.GetCollBackup().BackupTimestamp))
if err != nil {
log.Error("fail to bulk insert to partition",
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()),
zap.Error(err))
return err
}
return nil
}
task.RestoredSize = task.RestoredSize + partitionBackup.GetSize()
if task.ToRestoreSize == 0 {
task.Progress = 100
} else {
task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize)

for _, value := range restoreFileGroups {
files := value
job := func(ctx context.Context) error {
// todo: progress
return copyAndBulkInsert(files)
}
jobId := b.getRestoreWorkerPool(parentTaskID).SubmitWithId(job)
jobIds = append(jobIds, jobId)
}
}
return task, nil

err := b.getRestoreWorkerPool(parentTaskID).WaitJobs(jobIds)
return task, err
}

//func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targetCollectionName string,
// partitionBackup *backuppb.PartitionBackupInfo, task *backuppb.RestoreCollectionTask, isSameBucket bool, backupBucketName string, backupPath string, tempDir string) (*backuppb.RestoreCollectionTask, error) {
//
// if task.GetMetaOnly() {
// task.Progress = 100
// } else {
//
// task.RestoredSize = task.RestoredSize + partitionBackup.GetSize()
// if task.ToRestoreSize == 0 {
// task.Progress = 100
// } else {
// task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize)
// }
// }
// return task, nil
//}

func collectGroupIdsFromSegments(segments []*backuppb.SegmentBackupInfo) []int64 {
dict := make(map[int64]bool)
res := make([]int64, 0)
Expand Down
11 changes: 10 additions & 1 deletion internal/common/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type WorkerPool struct {
workerNum int
lim *rate.Limiter

jobNum atomic.Int32
nextId atomic.Int64
jobsStatus sync.Map
jobsError sync.Map
Expand Down Expand Up @@ -66,12 +67,13 @@ func (p *WorkerPool) work() error {
return fmt.Errorf("workerpool: wait token %w", err)
}
}

if err := jobWithId.job(p.subCtx); err != nil {
p.jobsError.Store(jobWithId.id, err)
p.jobsStatus.Store(jobWithId.id, "done")
p.jobNum.Dec()
return fmt.Errorf("workerpool: execute job %w", err)
}
p.jobNum.Dec()
p.jobsStatus.Store(jobWithId.id, "done")
return nil
})
Expand All @@ -81,14 +83,17 @@ func (p *WorkerPool) work() error {

func (p *WorkerPool) Submit(job Job) {
jobId := p.nextId.Inc()
p.jobNum.Inc()
p.job <- JobWithId{job: job, id: jobId}
//p.jobsStatus.Store(jobId, "started")
}

func (p *WorkerPool) Done() { close(p.job) }
func (p *WorkerPool) Wait() error { return p.g.Wait() }

func (p *WorkerPool) SubmitWithId(job Job) int64 {
jobId := p.nextId.Inc()
p.jobNum.Inc()
p.job <- JobWithId{job: job, id: jobId}
return jobId
}
Expand Down Expand Up @@ -118,3 +123,7 @@ func (p *WorkerPool) WaitJobs(jobIds []int64) error {
}
}
}

func (p *WorkerPool) JobNum() int32 {
return p.jobNum.Load()
}

0 comments on commit db2cee5

Please sign in to comment.