Skip to content

Commit

Permalink
Support gc pause during backup (#291)
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Feb 5, 2024
1 parent a1b1a33 commit 3ef8e81
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 204 deletions.
9 changes: 8 additions & 1 deletion configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ milvus:
tlsMode: 0
user: "root"
password: "Milvus"

useSSL: false # Optional, http API protocol, used to pause/resume GC of the Milvus cluster
httpPort: 9091 # Optional, http API port, used to pause/resume GC of the Milvus cluster

# Related configuration of minio, which is responsible for data persistence for Milvus.
minio:
Expand Down Expand Up @@ -54,4 +57,8 @@ backup:
restoreCollection: 2

# keep temporary files during restore, only use to debug
keepTempFiles: false
keepTempFiles: false

# Pause GC during backup through Milvus Http API.
pauseGcWhenBackup: true
pauseGcSeconds: 7200
58 changes: 58 additions & 0 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -536,10 +539,65 @@ func (b *BackupContext) backupCollectionExecute(ctx context.Context, backupInfo
return nil
}

func (b *BackupContext) pauseMilvusGC(ctx context.Context) {
httpAddress := b.params.MilvusCfg.Address + ":" + b.params.MilvusCfg.HttpPort
if b.params.MilvusCfg.EnableSSL {
httpAddress = "https://" + httpAddress
} else {
httpAddress = "http://" + httpAddress
}
pauseAPI := "/management/datacoord/garbage_collection/pause"
params := url.Values{}
params.Add("pause_seconds", strconv.Itoa(b.params.BackupCfg.PauseGcSeconds))
fullURL := fmt.Sprintf("%s?%s", httpAddress+pauseAPI, params.Encode())
response, err := http.Get(fullURL)
if err != nil {
log.Error("Pause Milvus GC Error:", zap.Error(err))
return
}
defer response.Body.Close()
// Read the response body
body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Error("Read response Error:", zap.Error(err))
return
}
log.Info("Pause Milvus GC response", zap.String("response", string(body)))
}

func (b *BackupContext) resumeMilvusGC(ctx context.Context) {
httpAddress := b.params.MilvusCfg.Address + ":" + b.params.MilvusCfg.HttpPort
if b.params.MilvusCfg.EnableSSL {
httpAddress = "https://" + httpAddress
} else {
httpAddress = "http://" + httpAddress
}
pauseAPI := "/management/datacoord/garbage_collection/resume"
fullURL := httpAddress + pauseAPI
response, err := http.Get(fullURL)
if err != nil {
log.Error("Resume Milvus GC Error:", zap.Error(err))
return
}
// Read the response body
body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Error("Read response Error:", zap.Error(err))
return
}
log.Info("Resume Milvus GC response", zap.String("response", string(body)))
}

func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backuppb.CreateBackupRequest, backupInfo *backuppb.BackupInfo) (*backuppb.BackupInfo, error) {
b.mu.Lock()
defer b.mu.Unlock()

// pause GC
if b.params.BackupCfg.PauseGcWhenBackup {
b.pauseMilvusGC(ctx)
defer b.resumeMilvusGC(ctx)
}

backupInfo.BackupTimestamp = uint64(time.Now().UnixNano() / int64(time.Millisecond))
backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_EXECUTING

Expand Down
32 changes: 31 additions & 1 deletion core/paramtable/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ type BackupConfig struct {
BackupCopyDataParallelism int
RestoreParallelism int

KeepTempFiles bool
KeepTempFiles bool
PauseGcWhenBackup bool
PauseGcSeconds int
}

func (p *BackupConfig) init(base *BaseTable) {
Expand All @@ -49,6 +51,8 @@ func (p *BackupConfig) init(base *BaseTable) {
p.initRestoreParallelism()
p.initBackupCopyDataParallelism()
p.initKeepTempFiles()
p.initPauseGcWhenBackup()
p.initPauseGcSeconds()
}

func (p *BackupConfig) initMaxSegmentGroupSize() {
Expand Down Expand Up @@ -79,6 +83,16 @@ func (p *BackupConfig) initKeepTempFiles() {
p.KeepTempFiles, _ = strconv.ParseBool(keepTempFiles)
}

func (p *BackupConfig) initPauseGcWhenBackup() {
pauseGcWhenBackup := p.Base.LoadWithDefault("backup.pauseGcWhenBackup", "false")
p.PauseGcWhenBackup, _ = strconv.ParseBool(pauseGcWhenBackup)
}

func (p *BackupConfig) initPauseGcSeconds() {
size := p.Base.ParseIntWithDefault("backup.pauseGcSeconds", 7200)
p.PauseGcSeconds = size
}

type MilvusConfig struct {
Base *BaseTable

Expand All @@ -88,6 +102,8 @@ type MilvusConfig struct {
Password string
AuthorizationEnabled bool
TLSMode int
HttpPort string
EnableSSL bool
}

func (p *MilvusConfig) init(base *BaseTable) {
Expand All @@ -99,6 +115,8 @@ func (p *MilvusConfig) init(base *BaseTable) {
p.initPassword()
p.initAuthorizationEnabled()
p.initTLSMode()
p.initHttpPort()
p.initSSLEnabled()
}

func (p *MilvusConfig) initAddress() {
Expand Down Expand Up @@ -141,6 +159,18 @@ func (p *MilvusConfig) initTLSMode() {
p.TLSMode = p.Base.ParseIntWithDefault("milvus.tlsMode", 0)
}

func (p *MilvusConfig) initHttpPort() {
port, err := p.Base.Load("milvus.httpPort")
if err != nil {
panic(err)
}
p.HttpPort = port
}

func (p *MilvusConfig) initSSLEnabled() {
p.EnableSSL = p.Base.ParseBool("milvus.useSSL", false)
}

// /////////////////////////////////////////////////////////////////////////////
// --- minio ---
const (
Expand Down
4 changes: 4 additions & 0 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ message CreateBackupRequest {
bool force = 6;
// only backup meta, including collection schema and index info
bool meta_only = 7;
// if true, stop GC to avoid the data compacted and GCed during backup, use it when the data to backup is very large.
bool stop_gc = 8;
// gc pause seconds, set it larger than the time cost of backup
int64 gc_pause = 9;
}

/**
Expand Down
Loading

0 comments on commit 3ef8e81

Please sign in to comment.