Skip to content

Commit

Permalink
Merge pull request #226 from wayblink/azure2
Browse files Browse the repository at this point in the history
Support cross Azrue container backup
  • Loading branch information
lentitude2tk authored Oct 23, 2023
2 parents c94d9b1 + 48a0620 commit 32f7b92
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 100 deletions.
4 changes: 4 additions & 0 deletions configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ minio:
bucketName: "a-bucket" # Milvus Bucket name in MinIO/S3, make it the same as your milvus instance
rootPath: "files" # Milvus storage root path in MinIO/S3, make it the same as your milvus instance

# only for azure
backupAccessKeyID: minioadmin # accessKeyID of MinIO/S3
backupSecretAccessKey: minioadmin # MinIO/S3 encryption string

backupBucketName: "a-bucket" # Bucket name to store backup data. Backup data will store to backupBucketName/backupRootPath
backupRootPath: "backup" # Rootpath to store backup data. Backup data will store to backupBucketName/backupRootPath

Expand Down
4 changes: 3 additions & 1 deletion core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,9 @@ func (b *BackupContext) backupCollection(ctx context.Context, backupInfo *backup
}

err = b.copySegments(ctx, segmentBackupInfos, BackupBinlogDirPath(b.backupRootPath, backupInfo.GetName()))
if err != nil {
return err
}
b.refreshBackupCache(backupInfo)

collectionBackup.Size = collectionBackupSize
Expand Down Expand Up @@ -1120,7 +1123,6 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64,
var rootPath string

if b.params.MinioCfg.RootPath != "" {
log.Debug("params.MinioCfg.RootPath", zap.String("params.MinioCfg.RootPath", b.params.MinioCfg.RootPath))
rootPath = fmt.Sprintf("%s/", b.params.MinioCfg.RootPath)
} else {
rootPath = ""
Expand Down
2 changes: 1 addition & 1 deletion core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
}
}

tempDir := "restore-temp-" + parentTaskID + SEPERATOR
tempDir := fmt.Sprintf("restore-temp-%s-%s-%s%s", parentTaskID, task.TargetDbName, task.TargetCollectionName, SEPERATOR)
isSameBucket := b.milvusBucketName == backupBucketName
// clean the temporary file
defer func() {
Expand Down
18 changes: 16 additions & 2 deletions core/paramtable/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,10 @@ type MinioConfig struct {
CloudProvider string
IAMEndpoint string

BackupBucketName string
BackupRootPath string
BackupAccessKeyID string
BackupSecretAccessKey string
BackupBucketName string
BackupRootPath string

StorageType string
}
Expand All @@ -186,6 +188,8 @@ func (p *MinioConfig) init(base *BaseTable) {
p.initCloudProvider()
p.initIAMEndpoint()

p.initBackupAccessKeyID()
p.initBackupSecretAccessKey()
p.initBackupBucketName()
p.initBackupRootPath()
}
Expand Down Expand Up @@ -246,6 +250,16 @@ func (p *MinioConfig) initIAMEndpoint() {
p.IAMEndpoint = iamEndpoint
}

func (p *MinioConfig) initBackupAccessKeyID() {
keyID := p.Base.LoadWithDefault("minio.backupAccessKeyID", DefaultMinioAccessKey)
p.BackupAccessKeyID = keyID
}

func (p *MinioConfig) initBackupSecretAccessKey() {
key := p.Base.LoadWithDefault("minio.backupSecretAccessKey", DefaultMinioSecretAccessKey)
p.BackupSecretAccessKey = key
}

func (p *MinioConfig) initBackupBucketName() {
bucketName := p.Base.LoadWithDefault("minio.backupBucketName", DefaultMinioBackupBucketName)
p.BackupBucketName = bucketName
Expand Down
99 changes: 40 additions & 59 deletions core/storage/azure_chunk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ import (

// AzureChunkManager is responsible for read and write data stored in minio.
type AzureChunkManager struct {
client *AzureObjectStorage
aos *AzureObjectStorage

//cli *azblob.Client
// ctx context.Context
bucketName string
rootPath string
//bucketName string
//rootPath string
}

var _ ChunkManager = (*AzureChunkManager)(nil)

func NewAzureChunkManager(ctx context.Context, c *config) (*AzureChunkManager, error) {
client, err := newAzureObjectStorageWithConfig(ctx, c)
aos, err := newAzureObjectStorageWithConfig(ctx, c)
if err != nil {
return nil, err
}
Expand All @@ -58,19 +58,19 @@ func NewAzureChunkManager(ctx context.Context, c *config) (*AzureChunkManager, e
// return nil, err
//}
mcm := &AzureChunkManager{
client: client,
aos: aos,
//cli: cli,
bucketName: c.bucketName,
rootPath: strings.TrimLeft(c.rootPath, "/"),
//bucketName: c.bucketName,
//rootPath: strings.TrimLeft(c.rootPath, "/"),
}
log.Info("Azure chunk manager init success.", zap.String("bucketname", c.bucketName), zap.String("root", mcm.RootPath()))
log.Info("Azure chunk manager init success.")
return mcm, nil
}

// RootPath returns minio root path.
func (mcm *AzureChunkManager) RootPath() string {
return mcm.rootPath
}
//func (mcm *AzureChunkManager) RootPath() string {
// return mcm.rootPath
//}

func (mcm *AzureChunkManager) Copy(ctx context.Context, fromBucketName string, toBucketName string, fromPath string, toPath string) error {
objectkeys, _, err := mcm.ListWithPrefix(ctx, fromBucketName, fromPath, true)
Expand All @@ -80,7 +80,7 @@ func (mcm *AzureChunkManager) Copy(ctx context.Context, fromBucketName string, t
}
for _, objectkey := range objectkeys {
dstObjectKey := strings.Replace(objectkey, fromPath, toPath, 1)
err := mcm.client.CopyObject(ctx, fromBucketName, toBucketName, objectkey, dstObjectKey)
err := mcm.aos.CopyObject(ctx, fromBucketName, toBucketName, objectkey, dstObjectKey)
if err != nil {
log.Error("copyObject error", zap.String("srcObjectKey", objectkey), zap.String("dstObjectKey", dstObjectKey), zap.Error(err))
return err
Expand Down Expand Up @@ -148,22 +148,26 @@ func (mcm *AzureChunkManager) MultiWrite(ctx context.Context, bucketName string,

// Exist checks whether chunk is saved to minio storage.
func (mcm *AzureChunkManager) Exist(ctx context.Context, bucketName string, filePath string) (bool, error) {
_, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath)
objs, err := mcm.aos.ListObjects(ctx, bucketName, filePath, true)
if err != nil {
if IsErrNoSuchKey(err) {
return false, nil
}
log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
log.Warn("failed to stat object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err))
return false, err
}
return true, nil
if len(objs) > 0 {
return true, nil
} else {
return false, nil
}
}

// Read reads the minio storage data if exists.
func (mcm *AzureChunkManager) Read(ctx context.Context, bucketName string, filePath string) ([]byte, error) {
object, err := mcm.getObject(ctx, bucketName, filePath, int64(0), int64(0))
if err != nil {
log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
log.Warn("failed to get object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err))
return nil, err
}
defer object.Close()
Expand All @@ -179,7 +183,7 @@ func (mcm *AzureChunkManager) Read(ctx context.Context, bucketName string, fileP
log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err))
return nil, err
}
size, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath)
size, err := mcm.getObjectSize(ctx, bucketName, filePath)
if err != nil {
log.Warn("failed to stat object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -313,7 +317,7 @@ func (mcm *AzureChunkManager) RemoveWithPrefix(ctx context.Context, bucketName s

// ListWithPrefix returns objects with provided prefix.
func (mcm *AzureChunkManager) ListWithPrefix(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []int64, error) {
objects, err := mcm.listObjects(ctx, bucketName, prefix, false)
objects, err := mcm.listObjects(ctx, bucketName, prefix, true)
if err != nil {
return nil, nil, err
}
Expand All @@ -327,9 +331,9 @@ func (mcm *AzureChunkManager) ListWithPrefix(ctx context.Context, bucketName str
return objectsKeys, sizes, nil
} else {
var objectsKeys []string
var sizes []int64
sizesDict := make(map[string]int64, 0)
objectsKeysDict := make(map[string]bool, 0)
for object, _ := range objects {
for object, size := range objects {
keyWithoutPrefix := strings.Replace(object, prefix, "", 1)
if strings.Contains(keyWithoutPrefix, "/") {
var key string
Expand All @@ -340,52 +344,29 @@ func (mcm *AzureChunkManager) ListWithPrefix(ctx context.Context, bucketName str
}
if _, exist := objectsKeysDict[key]; !exist {
objectsKeys = append(objectsKeys, key)
sizes = append(sizes, 0)
sizesDict[key] = size
objectsKeysDict[key] = true
} else {
sizesDict[key] = size + sizesDict[key]
}
} else {
key := prefix + keyWithoutPrefix
if _, exist := objectsKeysDict[key]; !exist {
objectsKeys = append(objectsKeys, key)
sizes = append(sizes, 0)
sizesDict[key] = size
objectsKeysDict[key] = true
} else {
sizesDict[key] = size + sizesDict[key]
}
}
}
var sizes []int64
for _, objectKey := range objectsKeys {
sizes = append(sizes, sizesDict[objectKey])
}

return objectsKeys, sizes, nil
}

//var objectsKeys []string
//var sizes []int64
//tasks := list.New()
//tasks.PushBack(prefix)
//for tasks.Len() > 0 {
// e := tasks.Front()
// pre := e.Value.(string)
// tasks.Remove(e)
//
// // TODO add concurrent call if performance matters
// // only return current level per call
// objects, err := mcm.listObjects(ctx, bucketName, pre, false)
// if err != nil {
// return nil, nil, err
// }
//
// for object, contentLength := range objects {
// // with tailing "/", object is a "directory"
// if strings.HasSuffix(object, "/") && recursive {
// // enqueue when recursive is true
// if object != pre {
// tasks.PushBack(object)
// }
// continue
// }
// objectsKeys = append(objectsKeys, object)
// sizes = append(sizes, contentLength)
// }
//}
//
//return objectsKeys, sizes, nil
}

func (mcm *AzureChunkManager) getObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) {
Expand All @@ -395,7 +376,7 @@ func (mcm *AzureChunkManager) getObject(ctx context.Context, bucketName, objectN
//}
//return resp.Body, nil

reader, err := mcm.client.GetObject(ctx, bucketName, objectName, offset, size)
reader, err := mcm.aos.GetObject(ctx, bucketName, objectName, offset, size)
switch err := err.(type) {
case *azcore.ResponseError:
if err.ErrorCode == string(bloberror.BlobNotFound) {
Expand All @@ -410,12 +391,12 @@ func (mcm *AzureChunkManager) getObject(ctx context.Context, bucketName, objectN
}

func (mcm *AzureChunkManager) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error {
err := mcm.client.PutObject(ctx, bucketName, objectName, reader, objectSize)
err := mcm.aos.PutObject(ctx, bucketName, objectName, reader, objectSize)
return err
}

func (mcm *AzureChunkManager) getObjectSize(ctx context.Context, bucketName, objectName string) (int64, error) {
info, err := mcm.client.StatObject(ctx, bucketName, objectName)
info, err := mcm.aos.StatObject(ctx, bucketName, objectName)

switch err := err.(type) {
case *azcore.ResponseError:
Expand All @@ -432,11 +413,11 @@ func (mcm *AzureChunkManager) getObjectSize(ctx context.Context, bucketName, obj
}

func (mcm *AzureChunkManager) listObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]int64, error) {
res, err := mcm.client.ListObjects(ctx, bucketName, prefix, recursive)
res, err := mcm.aos.ListObjects(ctx, bucketName, prefix, recursive)
return res, err
}

func (mcm *AzureChunkManager) removeObject(ctx context.Context, bucketName, objectName string) error {
err := mcm.client.RemoveObject(ctx, bucketName, objectName)
err := mcm.aos.RemoveObject(ctx, bucketName, objectName)
return err
}
Loading

0 comments on commit 32f7b92

Please sign in to comment.