Skip to content

Commit

Permalink
async restore
Browse files Browse the repository at this point in the history
  • Loading branch information
korotkov-aerospike committed Aug 6, 2024
1 parent 621a263 commit 9597dd8
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 36 deletions.
1 change: 1 addition & 0 deletions pkg/service/backup_list_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type BackupListReader interface {
ReadClusterConfiguration(path string) ([]byte, error)

// FindLastFullBackup returns last full backup prior to given time.
// Each element of an array is backup of a namespace.
FindLastFullBackup(toTime time.Time) ([]model.BackupDetails, error)

// FindIncrementalBackupsForNamespace returns all incremental backups in given range, sorted by time.
Expand Down
64 changes: 47 additions & 17 deletions pkg/service/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/aerospike/backup-go"
"log/slog"
"sync"
"time"
Expand Down Expand Up @@ -58,12 +59,21 @@ func (r *dataRestorer) Restore(request *model.RestoreRequestInternal) (RestoreJo
}
defer r.asClientCreator.Close(client)

restoreResult, err := r.restoreService.RestoreRun(ctx, client, request)
handler, err := r.restoreService.RestoreRun(ctx, client, request)
if err != nil {
r.restoreJobs.setFailed(jobID, fmt.Errorf("failed to start restore operation: %w", err))
return
}

// Wait for the restore operation to complete
err = handler.Wait(ctx)
if err != nil {
r.restoreJobs.setFailed(jobID, fmt.Errorf("failed restore operation: %w", err))
return
}
r.restoreJobs.increaseStats(jobID, restoreResult)

//restoreResult := handler.GetStats()
//r.restoreJobs.increaseStats(jobID, restoreResult)
r.restoreJobs.setDone(jobID)
}()

Expand Down Expand Up @@ -137,14 +147,12 @@ func (r *dataRestorer) restoreNamespace(
client *aerospike.Client,
backend BackupListReader,
request *model.RestoreTimestampRequest,
jobID RestoreJobID, fullBackup model.BackupDetails,
jobID RestoreJobID,
fullBackup model.BackupDetails,
) error {
result, err := r.restoreFromPath(ctx, client, request, fullBackup.Key)
if err != nil {
return fmt.Errorf("could not restore full backup for namespace %s: %v", fullBackup.Namespace, err)
}
r.restoreJobs.increaseStats(jobID, result)
allBackups := []model.BackupDetails{fullBackup}

// Find incremental backups
bounds, err := model.NewTimeBounds(&fullBackup.Created, ptr.Time(time.UnixMilli(request.Time)))
if err != nil {
return err
Expand All @@ -154,13 +162,35 @@ func (r *dataRestorer) restoreNamespace(
if err != nil {
return fmt.Errorf("could not find incremental backups for namespace %s: %v", fullBackup.Namespace, err)
}
slog.Info("Apply incremental backups", "size", len(incrementalBackups))
for _, incrBackup := range incrementalBackups {
result, err := r.restoreFromPath(ctx, client, request, incrBackup.Key)

// Append incremental backups to allBackups
allBackups = append(allBackups, incrementalBackups...)

slog.Info("Preparing to restore backups",
"namespace", fullBackup.Namespace,
"fullBackup", fullBackup.Created,
"incrementalCount", len(incrementalBackups))

// Now restore all backups in order
for i, backup := range allBackups {
slog.Info("Restoring backup",
"namespace", backup.Namespace,
"created", backup.Created,
"progress", fmt.Sprintf("%d/%d", i+1, len(allBackups)))

handler, err := r.restoreFromPath(ctx, client, request, backup.Key)
if err != nil {
return fmt.Errorf("could not restore incremental backup %s: %v", *incrBackup.Key, err)
return fmt.Errorf("could not start restore from backup created at %s: %v", backup.Created, err)
}
r.restoreJobs.increaseStats(jobID, result)

// Wait for the restore operation to complete
err = handler.Wait(ctx)
if err != nil {
return fmt.Errorf("error during backup restore created at %s: %v", backup.Created, err)
}

//result := handler.GetStats()
//r.restoreJobs.increaseStats(jobID, &result)
}

return nil
Expand All @@ -171,19 +201,19 @@ func (r *dataRestorer) restoreFromPath(
client *aerospike.Client,
request *model.RestoreTimestampRequest,
backupPath *string,
) (*model.RestoreResult, error) {
) (*backup.RestoreHandler, error) {
restoreRequest := r.toRestoreRequest(request)
restoreResult, err := r.restoreService.RestoreRun(ctx,
handler, err := r.restoreService.RestoreRun(ctx,
client,
&model.RestoreRequestInternal{
RestoreRequest: *restoreRequest,
Dir: backupPath,
})
if err != nil {
return nil, fmt.Errorf("could not restore backup at %s: %w", *backupPath, err)
return nil, fmt.Errorf("could not start restore from backup at %s: %w", *backupPath, err)
}

return restoreResult, nil
return handler, nil
}

func (r *dataRestorer) toRestoreRequest(request *model.RestoreTimestampRequest) *model.RestoreRequest {
Expand Down
20 changes: 2 additions & 18 deletions pkg/shared/restore_go.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (r *RestoreGo) RestoreRun(
ctx context.Context,
client *a.Client,
restoreRequest *model.RestoreRequestInternal,
) (*model.RestoreResult, error) {
) (*backup.RestoreHandler, error) {
var err error
backupClient, err := backup.NewClient(client, "1", slog.Default())
if err != nil {
Expand All @@ -48,23 +48,7 @@ func (r *RestoreGo) RestoreRun(
return nil, fmt.Errorf("failed to start restore, %w", err)
}

err = handler.Wait(ctx)
if err != nil {
return nil, fmt.Errorf("error during restore, %w", err)
}

stats := handler.GetStats()
return &model.RestoreResult{
TotalRecords: stats.GetReadRecords(),
InsertedRecords: stats.GetRecordsInserted(),
IndexCount: uint64(stats.GetSIndexes()),
UDFCount: uint64(stats.GetUDFs()),
FresherRecords: stats.GetRecordsFresher(),
SkippedRecords: stats.GetRecordsSkipped(),
ExistedRecords: stats.GetRecordsExisted(),
ExpiredRecords: stats.GetRecordsExpired(),
TotalBytes: stats.GetTotalBytesRead(),
}, nil
return handler, nil
}

//nolint:funlen
Expand Down
2 changes: 1 addition & 1 deletion pkg/shared/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ type Restore interface {
ctx context.Context,
client *aerospike.Client,
restoreRequest *model.RestoreRequestInternal,
) (*model.RestoreResult, error)
) (*backup.RestoreHandler, error)
}

0 comments on commit 9597dd8

Please sign in to comment.