Skip to content

Commit

Permalink
Optimize restore speed with many small segments (#356)
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Jun 6, 2024
1 parent acdfde1 commit 32e3e25
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 51 deletions.
11 changes: 6 additions & 5 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (
)

const (
BULKINSERT_TIMEOUT = 60 * 60
BULKINSERT_SLEEP_INTERVAL = 5
BACKUP_NAME = "BACKUP_NAME"
COLLECTION_RENAME_SUFFIX = "COLLECTION_RENAME_SUFFIX"
RPS = 1000
BULKINSERT_TIMEOUT = 60 * 60
BULKINSERT_SLEEP_INTERVAL = 5
BACKUP_NAME = "BACKUP_NAME"
COLLECTION_RENAME_SUFFIX = "COLLECTION_RENAME_SUFFIX"
RPS = 1000
BackupSegmentGroupMaxSizeInMB = 256
)

// makes sure BackupContext implements `Backup`
Expand Down
95 changes: 49 additions & 46 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,6 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
}
b.meta.AddCollection(collectionBackup)

//b.updateCollection(levelInfo, collectionBackup)
partitionBackupInfos := make([]*backuppb.PartitionBackupInfo, 0)
partitions, err := b.getMilvusClient().ShowPartitions(b.ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName())
if err != nil {
log.Error("fail to ShowPartitions", zap.Error(err))
Expand Down Expand Up @@ -378,9 +376,6 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
maxChannelBackupTimeStamp = checkpoint.GetTimestamp()
}
}
//collectionBackup.ChannelCheckpoints = channelCheckpoints
//collectionBackup.BackupTimestamp = maxChannelBackupTimeStamp
//collectionBackup.BackupPhysicalTimestamp = uint64(timeOfSeal)
b.meta.UpdateCollection(collectionBackup.Id, collectionBackup.CollectionId,
setCollectionChannelCheckpoints(channelCheckpoints),
setCollectionBackupTimestamp(maxChannelBackupTimeStamp),
Expand Down Expand Up @@ -424,8 +419,8 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
if _, ok := segmentDict[sid]; ok {
delete(segmentDict, sid)
unfilledSegments = append(unfilledSegments, seg)
} else {
log.Info("this may be old segments before flush, skip it", zap.Int64("id", sid))
//} else {
// log.Info("this may be old segments before flush, skip it", zap.Int64("id", sid))
}
}
if len(segmentDict) > 0 {
Expand Down Expand Up @@ -469,7 +464,6 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
NumOfRows: segment.NumRows,
}
b.meta.AddSegment(segmentInfo)
//b.updateSegment(levelInfo, segmentInfo)
partSegInfoMap[segment.ParititionID] = append(partSegInfoMap[segment.ParititionID], segmentInfo)
}

Expand All @@ -488,15 +482,10 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
LoadState: partitionLoadStates[partition.Name],
}
b.meta.AddPartition(partitionBackupInfo)
//b.updatePartition(levelInfo, partitionBackupInfo)
//partitionBackupInfos = append(partitionBackupInfos, partitionBackupInfo)
}

//collectionBackup.PartitionBackups = partitionBackupInfos
//collectionBackup.LoadState = collectionLoadState
b.meta.UpdateCollection(collectionBackup.Id, collectionBackup.CollectionId, setCollectionLoadState(collectionLoadState))
//b.updateCollection(levelInfo, collectionBackup)

partitionBackupInfos := b.meta.GetPartitions(collectionBackup.CollectionId)
log.Info("finish build partition info",
zap.String("collectionName", collectionBackup.GetCollectionName()),
zap.Int("partitionNum", len(partitionBackupInfos)))
Expand All @@ -507,39 +496,58 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
}

b.meta.UpdateCollection(collectionBackup.Id, collectionBackup.CollectionId, setCollectionSize(collectionBackupSize))
//collectionBackup.Size = collectionBackupSize
//b.updateCollection(levelInfo, collectionBackup)
return nil
}

func (b *BackupContext) backupCollectionExecute(ctx context.Context, collectionBackup *backuppb.CollectionBackupInfo) error {
log.Info("backupCollectionExecute", zap.Any("collectionMeta", collectionBackup.String()))
var segmentBackupInfos []*backuppb.SegmentBackupInfo

for _, partition := range b.meta.GetPartitions(collectionBackup.CollectionId) {
for _, segment := range b.meta.GetSegments(partition.GetPartitionId()) {
var segmentBackupInfos []*backuppb.SegmentBackupInfo
var currentSize int64 = 0
var groupID int64 = 1
segments := b.meta.GetSegments(partition.GetPartitionId())
for _, v := range segments {
segment := v
err := b.fillSegmentBackupInfo(ctx, segment)
if err != nil {
log.Error("Fail to fill segment backup info", zap.Error(err))
return err
}
if currentSize > BackupSegmentGroupMaxSizeInMB*1024*1024 { // 256MB
groupID++
currentSize = 0
}
currentSize = currentSize + segment.GetSize()
b.meta.UpdateSegment(segment.GetPartitionId(), segment.GetSegmentId(), setGroupID(groupID))
segmentBackupInfos = append(segmentBackupInfos, segment)
}
}
log.Info("Begin copy data",
zap.String("dbName", collectionBackup.GetDbName()),
zap.String("collectionName", collectionBackup.GetCollectionName()),
zap.Int("segmentNum", len(segmentBackupInfos)))
log.Info("Begin copy data",
zap.String("dbName", collectionBackup.GetDbName()),
zap.String("collectionName", collectionBackup.GetCollectionName()),
zap.Int64("collectionID", partition.GetCollectionId()),
zap.Int64("partitionID", partition.GetPartitionId()),
zap.Int("segmentNum", len(segmentBackupInfos)))

sort.SliceStable(segmentBackupInfos, func(i, j int) bool {
return segmentBackupInfos[i].Size < segmentBackupInfos[j].Size
})
err := b.copySegments(ctx, segmentBackupInfos)
if err != nil {
return err
sort.SliceStable(segmentBackupInfos, func(i, j int) bool {
return segmentBackupInfos[i].Size < segmentBackupInfos[j].Size
})

segmentIDs := lo.Map(segmentBackupInfos, func(segment *backuppb.SegmentBackupInfo, _ int) int64 {
return segment.GetSegmentId()
})
err := b.copySegments(ctx, segmentIDs)
if err != nil {
return err
}
}

//collectionBackup.EndTime = time.Now().Unix()
b.meta.UpdateCollection(collectionBackup.Id, collectionBackup.CollectionId, setCollectionEndTime(time.Now().Unix()))

log.Info("Finish copy data",
zap.String("dbName", collectionBackup.GetDbName()),
zap.String("collectionName", collectionBackup.GetCollectionName()),
zap.Int("segmentNum", len(segmentBackupInfos)))
zap.String("collectionName", collectionBackup.GetCollectionName()))
return nil
}

Expand Down Expand Up @@ -740,22 +748,12 @@ func (b *BackupContext) writeBackupInfoMeta(ctx context.Context, id string) erro
return nil
}

func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.SegmentBackupInfo) error {
func (b *BackupContext) copySegments(ctx context.Context, segmentIDs []int64) error {
jobIds := make([]int64, 0)
for _, v := range segments {
segment := v
log := log.With(zap.Int64("collection_id", segment.GetCollectionId()),
zap.Int64("partition_id", segment.GetPartitionId()),
zap.Int64("segment_id", segment.GetSegmentId()),
zap.Int64("group_id", segment.GetGroupId()))
log.Debug("copy segment")
err := b.fillSegmentBackupInfo(ctx, segment)
if err != nil {
log.Error("Fail to fill segment backup info", zap.Error(err))
return err
}
for _, segmentID := range segmentIDs {
segment := segmentID
job := func(ctx context.Context) error {
return b.copySegment(ctx, segment.GetSegmentId())
return b.copySegment(ctx, segment)
}
jobId := b.getCopyDataWorkerPool().SubmitWithId(job)
jobIds = append(jobIds, jobId)
Expand All @@ -767,6 +765,11 @@ func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.S

func (b *BackupContext) copySegment(ctx context.Context, segmentID int64) error {
segment := b.meta.GetSegment(segmentID)
log := log.With(zap.Int64("collection_id", segment.GetCollectionId()),
zap.Int64("partition_id", segment.GetPartitionId()),
zap.Int64("segment_id", segment.GetSegmentId()),
zap.Int64("group_id", segment.GetGroupId()))
log.Info("copy segment")
backupInfo := b.meta.GetBackupBySegmentID(segmentID)
dstPath := BackupBinlogDirPath(b.backupRootPath, backupInfo.GetName())
// generate target path
Expand Down Expand Up @@ -963,7 +966,7 @@ func (b *BackupContext) fillSegmentBackupInfo(ctx context.Context, segmentBackup

//segmentBackupInfo.Binlogs = insertLogs
//segmentBackupInfo.Deltalogs = deltaLogs
//segmentBackupInfo.Size = size
segmentBackupInfo.Size = size
b.meta.UpdateSegment(segmentBackupInfo.GetPartitionId(), segmentBackupInfo.GetSegmentId(), setSegmentBinlogs(insertLogs), setSegmentDeltaBinlogs(deltaLogs), setSegmentSize(size))
log.Debug("fill segment info", zap.Int64("segId", segmentBackupInfo.GetSegmentId()), zap.Int64("size", size))
return nil
Expand Down
6 changes: 6 additions & 0 deletions core/backup_meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,12 @@ func setSegmentSize(size int64) SegmentOpt {
}
}

func setGroupID(groupID int64) SegmentOpt {
return func(segment *backuppb.SegmentBackupInfo) {
segment.GroupId = groupID
}
}

func setSegmentBinlogs(binlogs []*backuppb.FieldBinlog) SegmentOpt {
return func(segment *backuppb.SegmentBackupInfo) {
segment.Binlogs = binlogs
Expand Down

0 comments on commit 32e3e25

Please sign in to comment.