diff --git a/configs/backup.yaml b/configs/backup.yaml index 8c8767b6..dad734d4 100644 --- a/configs/backup.yaml +++ b/configs/backup.yaml @@ -34,6 +34,10 @@ minio: bucketName: "a-bucket" # Milvus Bucket name in MinIO/S3, make it the same as your milvus instance rootPath: "files" # Milvus storage root path in MinIO/S3, make it the same as your milvus instance + # only for azure + backupAccessKeyID: minioadmin # accessKeyID of MinIO/S3 + backupSecretAccessKey: minioadmin # MinIO/S3 encryption string + backupBucketName: "a-bucket" # Bucket name to store backup data. Backup data will store to backupBucketName/backupRootPath backupRootPath: "backup" # Rootpath to store backup data. Backup data will store to backupBucketName/backupRootPath diff --git a/core/backup_impl_create_backup.go b/core/backup_impl_create_backup.go index 82ed1249..10aa7c66 100644 --- a/core/backup_impl_create_backup.go +++ b/core/backup_impl_create_backup.go @@ -521,6 +521,9 @@ func (b *BackupContext) backupCollection(ctx context.Context, backupInfo *backup } err = b.copySegments(ctx, segmentBackupInfos, BackupBinlogDirPath(b.backupRootPath, backupInfo.GetName())) + if err != nil { + return err + } b.refreshBackupCache(backupInfo) collectionBackup.Size = collectionBackupSize @@ -1120,7 +1123,6 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64, var rootPath string if b.params.MinioCfg.RootPath != "" { - log.Debug("params.MinioCfg.RootPath", zap.String("params.MinioCfg.RootPath", b.params.MinioCfg.RootPath)) rootPath = fmt.Sprintf("%s/", b.params.MinioCfg.RootPath) } else { rootPath = "" diff --git a/core/backup_impl_restore_backup.go b/core/backup_impl_restore_backup.go index 40e809db..ff8bb112 100644 --- a/core/backup_impl_restore_backup.go +++ b/core/backup_impl_restore_backup.go @@ -449,7 +449,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup } } - tempDir := "restore-temp-" + parentTaskID + SEPERATOR + tempDir := fmt.Sprintf("restore-temp-%s-%s-%s%s", parentTaskID, task.TargetDbName, task.TargetCollectionName, SEPERATOR) isSameBucket := b.milvusBucketName == backupBucketName // clean the temporary file defer func() { diff --git a/core/paramtable/params.go b/core/paramtable/params.go index 4befad13..fea3c5c7 100644 --- a/core/paramtable/params.go +++ b/core/paramtable/params.go @@ -165,8 +165,10 @@ type MinioConfig struct { CloudProvider string IAMEndpoint string - BackupBucketName string - BackupRootPath string + BackupAccessKeyID string + BackupSecretAccessKey string + BackupBucketName string + BackupRootPath string StorageType string } @@ -186,6 +188,8 @@ func (p *MinioConfig) init(base *BaseTable) { p.initCloudProvider() p.initIAMEndpoint() + p.initBackupAccessKeyID() + p.initBackupSecretAccessKey() p.initBackupBucketName() p.initBackupRootPath() } @@ -246,6 +250,16 @@ func (p *MinioConfig) initIAMEndpoint() { p.IAMEndpoint = iamEndpoint } +func (p *MinioConfig) initBackupAccessKeyID() { + keyID := p.Base.LoadWithDefault("minio.backupAccessKeyID", DefaultMinioAccessKey) + p.BackupAccessKeyID = keyID +} + +func (p *MinioConfig) initBackupSecretAccessKey() { + key := p.Base.LoadWithDefault("minio.backupSecretAccessKey", DefaultMinioSecretAccessKey) + p.BackupSecretAccessKey = key +} + func (p *MinioConfig) initBackupBucketName() { bucketName := p.Base.LoadWithDefault("minio.backupBucketName", DefaultMinioBackupBucketName) p.BackupBucketName = bucketName diff --git a/core/storage/azure_chunk_manager.go b/core/storage/azure_chunk_manager.go index 9d774cf9..622d6eb8 100644 --- a/core/storage/azure_chunk_manager.go +++ b/core/storage/azure_chunk_manager.go @@ -37,18 +37,18 @@ import ( // AzureChunkManager is responsible for read and write data stored in minio. type AzureChunkManager struct { - client *AzureObjectStorage + aos *AzureObjectStorage //cli *azblob.Client // ctx context.Context - bucketName string - rootPath string + //bucketName string + //rootPath string } var _ ChunkManager = (*AzureChunkManager)(nil) func NewAzureChunkManager(ctx context.Context, c *config) (*AzureChunkManager, error) { - client, err := newAzureObjectStorageWithConfig(ctx, c) + aos, err := newAzureObjectStorageWithConfig(ctx, c) if err != nil { return nil, err } @@ -58,19 +58,19 @@ func NewAzureChunkManager(ctx context.Context, c *config) (*AzureChunkManager, e // return nil, err //} mcm := &AzureChunkManager{ - client: client, + aos: aos, //cli: cli, - bucketName: c.bucketName, - rootPath: strings.TrimLeft(c.rootPath, "/"), + //bucketName: c.bucketName, + //rootPath: strings.TrimLeft(c.rootPath, "/"), } - log.Info("Azure chunk manager init success.", zap.String("bucketname", c.bucketName), zap.String("root", mcm.RootPath())) + log.Info("Azure chunk manager init success.") return mcm, nil } // RootPath returns minio root path. -func (mcm *AzureChunkManager) RootPath() string { - return mcm.rootPath -} +//func (mcm *AzureChunkManager) RootPath() string { +// return mcm.rootPath +//} func (mcm *AzureChunkManager) Copy(ctx context.Context, fromBucketName string, toBucketName string, fromPath string, toPath string) error { objectkeys, _, err := mcm.ListWithPrefix(ctx, fromBucketName, fromPath, true) @@ -80,7 +80,7 @@ func (mcm *AzureChunkManager) Copy(ctx context.Context, fromBucketName string, t } for _, objectkey := range objectkeys { dstObjectKey := strings.Replace(objectkey, fromPath, toPath, 1) - err := mcm.client.CopyObject(ctx, fromBucketName, toBucketName, objectkey, dstObjectKey) + err := mcm.aos.CopyObject(ctx, fromBucketName, toBucketName, objectkey, dstObjectKey) if err != nil { log.Error("copyObject error", zap.String("srcObjectKey", objectkey), zap.String("dstObjectKey", dstObjectKey), zap.Error(err)) return err @@ -148,22 +148,26 @@ func (mcm *AzureChunkManager) MultiWrite(ctx context.Context, bucketName string, // Exist checks whether chunk is saved to minio storage. func (mcm *AzureChunkManager) Exist(ctx context.Context, bucketName string, filePath string) (bool, error) { - _, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath) + objs, err := mcm.aos.ListObjects(ctx, bucketName, filePath, true) if err != nil { if IsErrNoSuchKey(err) { return false, nil } - log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) + log.Warn("failed to stat object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) return false, err } - return true, nil + if len(objs) > 0 { + return true, nil + } else { + return false, nil + } } // Read reads the minio storage data if exists. func (mcm *AzureChunkManager) Read(ctx context.Context, bucketName string, filePath string) ([]byte, error) { object, err := mcm.getObject(ctx, bucketName, filePath, int64(0), int64(0)) if err != nil { - log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) + log.Warn("failed to get object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) return nil, err } defer object.Close() @@ -179,7 +183,7 @@ func (mcm *AzureChunkManager) Read(ctx context.Context, bucketName string, fileP log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err)) return nil, err } - size, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath) + size, err := mcm.getObjectSize(ctx, bucketName, filePath) if err != nil { log.Warn("failed to stat object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) return nil, err @@ -313,7 +317,7 @@ func (mcm *AzureChunkManager) RemoveWithPrefix(ctx context.Context, bucketName s // ListWithPrefix returns objects with provided prefix. func (mcm *AzureChunkManager) ListWithPrefix(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []int64, error) { - objects, err := mcm.listObjects(ctx, bucketName, prefix, false) + objects, err := mcm.listObjects(ctx, bucketName, prefix, true) if err != nil { return nil, nil, err } @@ -327,9 +331,9 @@ func (mcm *AzureChunkManager) ListWithPrefix(ctx context.Context, bucketName str return objectsKeys, sizes, nil } else { var objectsKeys []string - var sizes []int64 + sizesDict := make(map[string]int64, 0) objectsKeysDict := make(map[string]bool, 0) - for object, _ := range objects { + for object, size := range objects { keyWithoutPrefix := strings.Replace(object, prefix, "", 1) if strings.Contains(keyWithoutPrefix, "/") { var key string @@ -340,52 +344,29 @@ func (mcm *AzureChunkManager) ListWithPrefix(ctx context.Context, bucketName str } if _, exist := objectsKeysDict[key]; !exist { objectsKeys = append(objectsKeys, key) - sizes = append(sizes, 0) + sizesDict[key] = size objectsKeysDict[key] = true + } else { + sizesDict[key] = size + sizesDict[key] } } else { key := prefix + keyWithoutPrefix if _, exist := objectsKeysDict[key]; !exist { objectsKeys = append(objectsKeys, key) - sizes = append(sizes, 0) + sizesDict[key] = size objectsKeysDict[key] = true + } else { + sizesDict[key] = size + sizesDict[key] } } } + var sizes []int64 + for _, objectKey := range objectsKeys { + sizes = append(sizes, sizesDict[objectKey]) + } + return objectsKeys, sizes, nil } - - //var objectsKeys []string - //var sizes []int64 - //tasks := list.New() - //tasks.PushBack(prefix) - //for tasks.Len() > 0 { - // e := tasks.Front() - // pre := e.Value.(string) - // tasks.Remove(e) - // - // // TODO add concurrent call if performance matters - // // only return current level per call - // objects, err := mcm.listObjects(ctx, bucketName, pre, false) - // if err != nil { - // return nil, nil, err - // } - // - // for object, contentLength := range objects { - // // with tailing "/", object is a "directory" - // if strings.HasSuffix(object, "/") && recursive { - // // enqueue when recursive is true - // if object != pre { - // tasks.PushBack(object) - // } - // continue - // } - // objectsKeys = append(objectsKeys, object) - // sizes = append(sizes, contentLength) - // } - //} - // - //return objectsKeys, sizes, nil } func (mcm *AzureChunkManager) getObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) { @@ -395,7 +376,7 @@ func (mcm *AzureChunkManager) getObject(ctx context.Context, bucketName, objectN //} //return resp.Body, nil - reader, err := mcm.client.GetObject(ctx, bucketName, objectName, offset, size) + reader, err := mcm.aos.GetObject(ctx, bucketName, objectName, offset, size) switch err := err.(type) { case *azcore.ResponseError: if err.ErrorCode == string(bloberror.BlobNotFound) { @@ -410,12 +391,12 @@ func (mcm *AzureChunkManager) getObject(ctx context.Context, bucketName, objectN } func (mcm *AzureChunkManager) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error { - err := mcm.client.PutObject(ctx, bucketName, objectName, reader, objectSize) + err := mcm.aos.PutObject(ctx, bucketName, objectName, reader, objectSize) return err } func (mcm *AzureChunkManager) getObjectSize(ctx context.Context, bucketName, objectName string) (int64, error) { - info, err := mcm.client.StatObject(ctx, bucketName, objectName) + info, err := mcm.aos.StatObject(ctx, bucketName, objectName) switch err := err.(type) { case *azcore.ResponseError: @@ -432,11 +413,11 @@ func (mcm *AzureChunkManager) getObjectSize(ctx context.Context, bucketName, obj } func (mcm *AzureChunkManager) listObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]int64, error) { - res, err := mcm.client.ListObjects(ctx, bucketName, prefix, recursive) + res, err := mcm.aos.ListObjects(ctx, bucketName, prefix, recursive) return res, err } func (mcm *AzureChunkManager) removeObject(ctx context.Context, bucketName, objectName string) error { - err := mcm.client.RemoveObject(ctx, bucketName, objectName) + err := mcm.aos.RemoveObject(ctx, bucketName, objectName) return err } diff --git a/core/storage/azure_object_storage.go b/core/storage/azure_object_storage.go index 6bed8124..b71c18ad 100644 --- a/core/storage/azure_object_storage.go +++ b/core/storage/azure_object_storage.go @@ -21,45 +21,75 @@ import ( "fmt" "io" "os" + "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service" "github.com/zilliztech/milvus-backup/internal/util/retry" ) -type AzureObjectStorage struct { - Client *service.Client - config *config +const sasSignMinute = 60 + +type innerAzureClient struct { + client *service.Client + + bucketName string + accessKeyID string + secretAccessKeyID string + createBucket bool } -type AzureClient struct { - cli *azblob.Client +type AzureObjectStorage struct { + //Client *service.Client + clients map[string]*innerAzureClient + //config *config } -func NewAzureClient(ctx context.Context, cfg *config) (*azblob.Client, error) { - cred, err := azblob.NewSharedKeyCredential(cfg.accessKeyID, cfg.secretAccessKeyID) +//func NewAzureClient(ctx context.Context, cfg *config) (*azblob.Client, error) { +// cred, err := azblob.NewSharedKeyCredential(cfg.accessKeyID, cfg.secretAccessKeyID) +// if err != nil { +// return nil, fmt.Errorf("storage: new azure shared key credential %w", err) +// } +// endpoint := fmt.Sprintf("https://%s.blob.core.windows.net", cfg.accessKeyID) +// cli, err := azblob.NewClientWithSharedKeyCredential(endpoint, cred, nil) +// if err != nil { +// return nil, fmt.Errorf("storage: new azure aos %w", err) +// } +// +// return cli, nil +//} + +func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObjectStorage, error) { + client, err := newAzureObjectClient(ctx, c.address, c.accessKeyID, c.secretAccessKeyID, c.bucketName, c.useIAM, c.createBucket) if err != nil { - return nil, fmt.Errorf("storage: new azure shared key credential %w", err) + return nil, err } - endpoint := fmt.Sprintf("https://%s.blob.core.windows.net", cfg.accessKeyID) - cli, err := azblob.NewClientWithSharedKeyCredential(endpoint, cred, nil) + backupClient, err := newAzureObjectClient(ctx, c.address, c.backupAccessKeyID, c.backupSecretAccessKeyID, c.backupBucketName, c.useIAM, c.createBucket) if err != nil { - return nil, fmt.Errorf("storage: new azure client %w", err) + return nil, err } - - return cli, nil + clients := map[string]*innerAzureClient{ + c.bucketName: client, + c.backupBucketName: backupClient, + } + return &AzureObjectStorage{ + clients: clients, + //config: c, + }, nil } -func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObjectStorage, error) { +func newAzureObjectClient(ctx context.Context, address, accessKeyID, secretAccessKeyID, bucketName string, useIAM, createBucket bool) (*innerAzureClient, error) { var client *service.Client var err error - if c.useIAM { + if useIAM { cred, credErr := azidentity.NewWorkloadIdentityCredential(&azidentity.WorkloadIdentityCredentialOptions{ ClientID: os.Getenv("AZURE_CLIENT_ID"), TenantID: os.Getenv("AZURE_TENANT_ID"), @@ -68,29 +98,26 @@ func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObje if credErr != nil { return nil, credErr } - client, err = service.NewClient("https://"+c.accessKeyID+".blob."+c.address+"/", cred, &service.ClientOptions{}) + client, err = service.NewClient("https://"+accessKeyID+".blob."+address+"/", cred, &service.ClientOptions{}) } else { - connectionString := os.Getenv("AZURE_STORAGE_CONNECTION_STRING") - if connectionString == "" { - connectionString = "DefaultEndpointsProtocol=https;AccountName=" + c.accessKeyID + - ";AccountKey=" + c.secretAccessKeyID + ";EndpointSuffix=" + c.address - } + connectionString := "DefaultEndpointsProtocol=https;AccountName=" + accessKeyID + + ";AccountKey=" + secretAccessKeyID + ";EndpointSuffix=" + address client, err = service.NewClientFromConnectionString(connectionString, &service.ClientOptions{}) } if err != nil { return nil, err } - if c.bucketName == "" { + if bucketName == "" { return nil, fmt.Errorf("invalid bucket name") } // check valid in first query checkBucketFn := func() error { - _, err := client.NewContainerClient(c.bucketName).GetProperties(ctx, &container.GetPropertiesOptions{}) + _, err := client.NewContainerClient(bucketName).GetProperties(ctx, &container.GetPropertiesOptions{}) if err != nil { switch err := err.(type) { case *azcore.ResponseError: - if c.createBucket && err.ErrorCode == string(bloberror.ContainerNotFound) { - _, createErr := client.NewContainerClient(c.bucketName).Create(ctx, &azblob.CreateContainerOptions{}) + if createBucket && err.ErrorCode == string(bloberror.ContainerNotFound) { + _, createErr := client.NewContainerClient(bucketName).Create(ctx, &azblob.CreateContainerOptions{}) if createErr != nil { return createErr } @@ -104,7 +131,17 @@ func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObje if err != nil { return nil, err } - return &AzureObjectStorage{Client: client, config: c}, nil + return &innerAzureClient{ + client: client, + bucketName: bucketName, + accessKeyID: accessKeyID, + secretAccessKeyID: secretAccessKeyID, + createBucket: createBucket, + }, nil +} + +func (aos *AzureObjectStorage) getClient(ctx context.Context, bucketName string) *service.Client { + return aos.clients[bucketName].client } func (aos *AzureObjectStorage) GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) { @@ -115,7 +152,7 @@ func (aos *AzureObjectStorage) GetObject(ctx context.Context, bucketName, object Count: size, } } - object, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).DownloadStream(ctx, &opts) + object, err := aos.clients[bucketName].client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).DownloadStream(ctx, &opts) if err != nil { return nil, err @@ -124,12 +161,12 @@ func (aos *AzureObjectStorage) GetObject(ctx context.Context, bucketName, object } func (aos *AzureObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error { - _, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).UploadStream(ctx, reader, &azblob.UploadStreamOptions{}) + _, err := aos.clients[bucketName].client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).UploadStream(ctx, reader, &azblob.UploadStreamOptions{}) return err } func (aos *AzureObjectStorage) StatObject(ctx context.Context, bucketName, objectName string) (int64, error) { - info, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).GetProperties(ctx, &blob.GetPropertiesOptions{}) + info, err := aos.clients[bucketName].client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).GetProperties(ctx, &blob.GetPropertiesOptions{}) if err != nil { return 0, err } @@ -137,7 +174,7 @@ func (aos *AzureObjectStorage) StatObject(ctx context.Context, bucketName, objec } func (aos *AzureObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]int64, error) { - pager := aos.Client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{ + pager := aos.clients[bucketName].client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{ Prefix: &prefix, }) // pager := aos.Client.NewContainerClient(bucketName).NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ @@ -158,12 +195,49 @@ func (aos *AzureObjectStorage) ListObjects(ctx context.Context, bucketName strin } func (aos *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error { - _, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).Delete(ctx, &blob.DeleteOptions{}) + _, err := aos.clients[bucketName].client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).Delete(ctx, &blob.DeleteOptions{}) return err } func (aos *AzureObjectStorage) CopyObject(ctx context.Context, fromBucketName, toBucketName, fromPath, toPath string) error { - fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.config.accessKeyID, fromBucketName, fromPath) - _, err := aos.Client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil) - return err + if aos.clients[fromBucketName].accessKeyID == aos.clients[toBucketName].accessKeyID { + fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath) + _, err := aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil) + return err + } else { + srcSAS, err := aos.getSAS(fromBucketName) + if err != nil { + return err + } + fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath, srcSAS.Encode()) + _, err = aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil) + return err + } +} + +func (aos *AzureObjectStorage) getSAS(bucket string) (*sas.QueryParameters, error) { + srcSvcCli := aos.clients[bucket].client + // Set current and past time and create key + now := time.Now().UTC().Add(-10 * time.Second) + expiry := now.Add(48 * time.Hour) + info := service.KeyInfo{ + Start: to.Ptr(now.UTC().Format(sas.TimeFormat)), + Expiry: to.Ptr(expiry.UTC().Format(sas.TimeFormat)), + } + udc, err := srcSvcCli.GetUserDelegationCredential(context.Background(), info, nil) + if err != nil { + return nil, err + } + // Create Blob Signature Values with desired permissions and sign with user delegation credential + sasQueryParams, err := sas.BlobSignatureValues{ + Protocol: sas.ProtocolHTTPS, + StartTime: time.Now().UTC().Add(time.Second * -10), + ExpiryTime: time.Now().UTC().Add(time.Duration(sasSignMinute * time.Minute)), + Permissions: to.Ptr(sas.ContainerPermissions{Read: true, List: true}).String(), + ContainerName: bucket, + }.SignWithUserDelegation(udc) + if err != nil { + return nil, err + } + return &sasQueryParams, nil } diff --git a/core/storage/chunk_manager.go b/core/storage/chunk_manager.go index cbd6158d..7f7b995a 100644 --- a/core/storage/chunk_manager.go +++ b/core/storage/chunk_manager.go @@ -51,12 +51,18 @@ func newAzureChunkManagerWithParams(ctx context.Context, params paramtable.Backu c.accessKeyID = params.MinioCfg.AccessKeyID c.secretAccessKeyID = params.MinioCfg.SecretAccessKey c.useSSL = params.MinioCfg.UseSSL - c.bucketName = params.MinioCfg.BackupBucketName + c.bucketName = params.MinioCfg.BucketName c.rootPath = params.MinioCfg.RootPath c.cloudProvider = params.MinioCfg.CloudProvider c.storageEngine = params.MinioCfg.StorageType c.useIAM = params.MinioCfg.UseIAM c.iamEndpoint = params.MinioCfg.IAMEndpoint c.createBucket = true + + c.backupAccessKeyID = params.MinioCfg.BackupAccessKeyID + c.backupSecretAccessKeyID = params.MinioCfg.BackupSecretAccessKey + c.backupBucketName = params.MinioCfg.BackupBucketName + c.backupRootPath = params.MinioCfg.BackupRootPath + return NewAzureChunkManager(ctx, c) } diff --git a/core/storage/options.go b/core/storage/options.go index 2b99a5db..ea1292f2 100644 --- a/core/storage/options.go +++ b/core/storage/options.go @@ -11,9 +11,14 @@ type config struct { rootPath string useIAM bool iamEndpoint string - + cloudProvider string storageEngine string + + backupAccessKeyID string + backupSecretAccessKeyID string + backupBucketName string + backupRootPath string } func newDefaultConfig() *config { diff --git a/core/storage/types.go b/core/storage/types.go index bfc62191..bce6b329 100644 --- a/core/storage/types.go +++ b/core/storage/types.go @@ -16,7 +16,8 @@ type FileReader interface { type ChunkManager interface { // RootPath returns current root path. // Useless in backup tool - RootPath() string + // RootPath() string + // Path returns path of @filePath. Path(ctx context.Context, bucketName string, filePath string) (string, error) // Size returns path of @filePath.