diff --git a/configs/backup.yaml b/configs/backup.yaml index 8c8767b6..dad734d4 100644 --- a/configs/backup.yaml +++ b/configs/backup.yaml @@ -34,6 +34,10 @@ minio: bucketName: "a-bucket" # Milvus Bucket name in MinIO/S3, make it the same as your milvus instance rootPath: "files" # Milvus storage root path in MinIO/S3, make it the same as your milvus instance + # only for azure + backupAccessKeyID: minioadmin # accessKeyID of MinIO/S3 + backupSecretAccessKey: minioadmin # MinIO/S3 encryption string + backupBucketName: "a-bucket" # Bucket name to store backup data. Backup data will store to backupBucketName/backupRootPath backupRootPath: "backup" # Rootpath to store backup data. Backup data will store to backupBucketName/backupRootPath diff --git a/core/backup_impl_create_backup.go b/core/backup_impl_create_backup.go index 82ed1249..b70aad10 100644 --- a/core/backup_impl_create_backup.go +++ b/core/backup_impl_create_backup.go @@ -521,6 +521,9 @@ func (b *BackupContext) backupCollection(ctx context.Context, backupInfo *backup } err = b.copySegments(ctx, segmentBackupInfos, BackupBinlogDirPath(b.backupRootPath, backupInfo.GetName())) + if err != nil { + return err + } b.refreshBackupCache(backupInfo) collectionBackup.Size = collectionBackupSize diff --git a/core/paramtable/params.go b/core/paramtable/params.go index 4befad13..fea3c5c7 100644 --- a/core/paramtable/params.go +++ b/core/paramtable/params.go @@ -165,8 +165,10 @@ type MinioConfig struct { CloudProvider string IAMEndpoint string - BackupBucketName string - BackupRootPath string + BackupAccessKeyID string + BackupSecretAccessKey string + BackupBucketName string + BackupRootPath string StorageType string } @@ -186,6 +188,8 @@ func (p *MinioConfig) init(base *BaseTable) { p.initCloudProvider() p.initIAMEndpoint() + p.initBackupAccessKeyID() + p.initBackupSecretAccessKey() p.initBackupBucketName() p.initBackupRootPath() } @@ -246,6 +250,16 @@ func (p *MinioConfig) initIAMEndpoint() { p.IAMEndpoint = iamEndpoint } +func (p *MinioConfig) initBackupAccessKeyID() { + keyID := p.Base.LoadWithDefault("minio.backupAccessKeyID", DefaultMinioAccessKey) + p.BackupAccessKeyID = keyID +} + +func (p *MinioConfig) initBackupSecretAccessKey() { + key := p.Base.LoadWithDefault("minio.backupSecretAccessKey", DefaultMinioSecretAccessKey) + p.BackupSecretAccessKey = key +} + func (p *MinioConfig) initBackupBucketName() { bucketName := p.Base.LoadWithDefault("minio.backupBucketName", DefaultMinioBackupBucketName) p.BackupBucketName = bucketName diff --git a/core/storage/azure_chunk_manager.go b/core/storage/azure_chunk_manager.go index 9d774cf9..4d2923bc 100644 --- a/core/storage/azure_chunk_manager.go +++ b/core/storage/azure_chunk_manager.go @@ -37,18 +37,18 @@ import ( // AzureChunkManager is responsible for read and write data stored in minio. type AzureChunkManager struct { - client *AzureObjectStorage + aos *AzureObjectStorage //cli *azblob.Client // ctx context.Context - bucketName string - rootPath string + //bucketName string + //rootPath string } var _ ChunkManager = (*AzureChunkManager)(nil) func NewAzureChunkManager(ctx context.Context, c *config) (*AzureChunkManager, error) { - client, err := newAzureObjectStorageWithConfig(ctx, c) + aos, err := newAzureObjectStorageWithConfig(ctx, c) if err != nil { return nil, err } @@ -58,19 +58,19 @@ func NewAzureChunkManager(ctx context.Context, c *config) (*AzureChunkManager, e // return nil, err //} mcm := &AzureChunkManager{ - client: client, + aos: aos, //cli: cli, - bucketName: c.bucketName, - rootPath: strings.TrimLeft(c.rootPath, "/"), + //bucketName: c.bucketName, + //rootPath: strings.TrimLeft(c.rootPath, "/"), } - log.Info("Azure chunk manager init success.", zap.String("bucketname", c.bucketName), zap.String("root", mcm.RootPath())) + log.Info("Azure chunk manager init success.") return mcm, nil } // RootPath returns minio root path. -func (mcm *AzureChunkManager) RootPath() string { - return mcm.rootPath -} +//func (mcm *AzureChunkManager) RootPath() string { +// return mcm.rootPath +//} func (mcm *AzureChunkManager) Copy(ctx context.Context, fromBucketName string, toBucketName string, fromPath string, toPath string) error { objectkeys, _, err := mcm.ListWithPrefix(ctx, fromBucketName, fromPath, true) @@ -80,7 +80,7 @@ func (mcm *AzureChunkManager) Copy(ctx context.Context, fromBucketName string, t } for _, objectkey := range objectkeys { dstObjectKey := strings.Replace(objectkey, fromPath, toPath, 1) - err := mcm.client.CopyObject(ctx, fromBucketName, toBucketName, objectkey, dstObjectKey) + err := mcm.aos.CopyObject(ctx, fromBucketName, toBucketName, objectkey, dstObjectKey) if err != nil { log.Error("copyObject error", zap.String("srcObjectKey", objectkey), zap.String("dstObjectKey", dstObjectKey), zap.Error(err)) return err @@ -148,12 +148,12 @@ func (mcm *AzureChunkManager) MultiWrite(ctx context.Context, bucketName string, // Exist checks whether chunk is saved to minio storage. func (mcm *AzureChunkManager) Exist(ctx context.Context, bucketName string, filePath string) (bool, error) { - _, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath) + _, err := mcm.getObjectSize(ctx, bucketName, filePath) if err != nil { if IsErrNoSuchKey(err) { return false, nil } - log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) + log.Warn("failed to stat object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) return false, err } return true, nil @@ -163,7 +163,7 @@ func (mcm *AzureChunkManager) Exist(ctx context.Context, bucketName string, file func (mcm *AzureChunkManager) Read(ctx context.Context, bucketName string, filePath string) ([]byte, error) { object, err := mcm.getObject(ctx, bucketName, filePath, int64(0), int64(0)) if err != nil { - log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) + log.Warn("failed to get object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) return nil, err } defer object.Close() @@ -179,7 +179,7 @@ func (mcm *AzureChunkManager) Read(ctx context.Context, bucketName string, fileP log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err)) return nil, err } - size, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath) + size, err := mcm.getObjectSize(ctx, bucketName, filePath) if err != nil { log.Warn("failed to stat object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) return nil, err @@ -395,7 +395,7 @@ func (mcm *AzureChunkManager) getObject(ctx context.Context, bucketName, objectN //} //return resp.Body, nil - reader, err := mcm.client.GetObject(ctx, bucketName, objectName, offset, size) + reader, err := mcm.aos.GetObject(ctx, bucketName, objectName, offset, size) switch err := err.(type) { case *azcore.ResponseError: if err.ErrorCode == string(bloberror.BlobNotFound) { @@ -410,12 +410,12 @@ func (mcm *AzureChunkManager) getObject(ctx context.Context, bucketName, objectN } func (mcm *AzureChunkManager) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error { - err := mcm.client.PutObject(ctx, bucketName, objectName, reader, objectSize) + err := mcm.aos.PutObject(ctx, bucketName, objectName, reader, objectSize) return err } func (mcm *AzureChunkManager) getObjectSize(ctx context.Context, bucketName, objectName string) (int64, error) { - info, err := mcm.client.StatObject(ctx, bucketName, objectName) + info, err := mcm.aos.StatObject(ctx, bucketName, objectName) switch err := err.(type) { case *azcore.ResponseError: @@ -432,11 +432,11 @@ func (mcm *AzureChunkManager) getObjectSize(ctx context.Context, bucketName, obj } func (mcm *AzureChunkManager) listObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]int64, error) { - res, err := mcm.client.ListObjects(ctx, bucketName, prefix, recursive) + res, err := mcm.aos.ListObjects(ctx, bucketName, prefix, recursive) return res, err } func (mcm *AzureChunkManager) removeObject(ctx context.Context, bucketName, objectName string) error { - err := mcm.client.RemoveObject(ctx, bucketName, objectName) + err := mcm.aos.RemoveObject(ctx, bucketName, objectName) return err } diff --git a/core/storage/azure_object_storage.go b/core/storage/azure_object_storage.go index 6bed8124..580f0366 100644 --- a/core/storage/azure_object_storage.go +++ b/core/storage/azure_object_storage.go @@ -33,33 +33,58 @@ import ( "github.com/zilliztech/milvus-backup/internal/util/retry" ) -type AzureObjectStorage struct { - Client *service.Client - config *config +type innerAzureClient struct { + client *service.Client + + bucketName string + accessKeyID string + secretAccessKeyID string + createBucket bool } -type AzureClient struct { - cli *azblob.Client +type AzureObjectStorage struct { + //Client *service.Client + clients map[string]*innerAzureClient + //config *config } -func NewAzureClient(ctx context.Context, cfg *config) (*azblob.Client, error) { - cred, err := azblob.NewSharedKeyCredential(cfg.accessKeyID, cfg.secretAccessKeyID) +//func NewAzureClient(ctx context.Context, cfg *config) (*azblob.Client, error) { +// cred, err := azblob.NewSharedKeyCredential(cfg.accessKeyID, cfg.secretAccessKeyID) +// if err != nil { +// return nil, fmt.Errorf("storage: new azure shared key credential %w", err) +// } +// endpoint := fmt.Sprintf("https://%s.blob.core.windows.net", cfg.accessKeyID) +// cli, err := azblob.NewClientWithSharedKeyCredential(endpoint, cred, nil) +// if err != nil { +// return nil, fmt.Errorf("storage: new azure aos %w", err) +// } +// +// return cli, nil +//} + +func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObjectStorage, error) { + client, err := newAzureObjectClient(ctx, c.address, c.accessKeyID, c.secretAccessKeyID, c.bucketName, c.useIAM, c.createBucket) if err != nil { - return nil, fmt.Errorf("storage: new azure shared key credential %w", err) + return nil, err } - endpoint := fmt.Sprintf("https://%s.blob.core.windows.net", cfg.accessKeyID) - cli, err := azblob.NewClientWithSharedKeyCredential(endpoint, cred, nil) + backupClient, err := newAzureObjectClient(ctx, c.address, c.backupAccessKeyID, c.backupSecretAccessKeyID, c.backupBucketName, c.useIAM, c.createBucket) if err != nil { - return nil, fmt.Errorf("storage: new azure client %w", err) + return nil, err } - - return cli, nil + clients := map[string]*innerAzureClient{ + c.bucketName: client, + c.backupBucketName: backupClient, + } + return &AzureObjectStorage{ + clients: clients, + //config: c, + }, nil } -func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObjectStorage, error) { +func newAzureObjectClient(ctx context.Context, address, accessKeyID, secretAccessKeyID, bucketName string, useIAM, createBucket bool) (*innerAzureClient, error) { var client *service.Client var err error - if c.useIAM { + if useIAM { cred, credErr := azidentity.NewWorkloadIdentityCredential(&azidentity.WorkloadIdentityCredentialOptions{ ClientID: os.Getenv("AZURE_CLIENT_ID"), TenantID: os.Getenv("AZURE_TENANT_ID"), @@ -68,29 +93,26 @@ func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObje if credErr != nil { return nil, credErr } - client, err = service.NewClient("https://"+c.accessKeyID+".blob."+c.address+"/", cred, &service.ClientOptions{}) + client, err = service.NewClient("https://"+accessKeyID+".blob."+address+"/", cred, &service.ClientOptions{}) } else { - connectionString := os.Getenv("AZURE_STORAGE_CONNECTION_STRING") - if connectionString == "" { - connectionString = "DefaultEndpointsProtocol=https;AccountName=" + c.accessKeyID + - ";AccountKey=" + c.secretAccessKeyID + ";EndpointSuffix=" + c.address - } + connectionString := "DefaultEndpointsProtocol=https;AccountName=" + accessKeyID + + ";AccountKey=" + secretAccessKeyID + ";EndpointSuffix=" + address client, err = service.NewClientFromConnectionString(connectionString, &service.ClientOptions{}) } if err != nil { return nil, err } - if c.bucketName == "" { + if bucketName == "" { return nil, fmt.Errorf("invalid bucket name") } // check valid in first query checkBucketFn := func() error { - _, err := client.NewContainerClient(c.bucketName).GetProperties(ctx, &container.GetPropertiesOptions{}) + _, err := client.NewContainerClient(bucketName).GetProperties(ctx, &container.GetPropertiesOptions{}) if err != nil { switch err := err.(type) { case *azcore.ResponseError: - if c.createBucket && err.ErrorCode == string(bloberror.ContainerNotFound) { - _, createErr := client.NewContainerClient(c.bucketName).Create(ctx, &azblob.CreateContainerOptions{}) + if createBucket && err.ErrorCode == string(bloberror.ContainerNotFound) { + _, createErr := client.NewContainerClient(bucketName).Create(ctx, &azblob.CreateContainerOptions{}) if createErr != nil { return createErr } @@ -104,7 +126,17 @@ func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObje if err != nil { return nil, err } - return &AzureObjectStorage{Client: client, config: c}, nil + return &innerAzureClient{ + client: client, + bucketName: bucketName, + accessKeyID: accessKeyID, + secretAccessKeyID: secretAccessKeyID, + createBucket: createBucket, + }, nil +} + +func (aos *AzureObjectStorage) getClient(ctx context.Context, bucketName string) *service.Client { + return aos.clients[bucketName].client } func (aos *AzureObjectStorage) GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) { @@ -115,7 +147,7 @@ func (aos *AzureObjectStorage) GetObject(ctx context.Context, bucketName, object Count: size, } } - object, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).DownloadStream(ctx, &opts) + object, err := aos.clients[bucketName].client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).DownloadStream(ctx, &opts) if err != nil { return nil, err @@ -124,12 +156,12 @@ func (aos *AzureObjectStorage) GetObject(ctx context.Context, bucketName, object } func (aos *AzureObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error { - _, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).UploadStream(ctx, reader, &azblob.UploadStreamOptions{}) + _, err := aos.clients[bucketName].client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).UploadStream(ctx, reader, &azblob.UploadStreamOptions{}) return err } func (aos *AzureObjectStorage) StatObject(ctx context.Context, bucketName, objectName string) (int64, error) { - info, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).GetProperties(ctx, &blob.GetPropertiesOptions{}) + info, err := aos.clients[bucketName].client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).GetProperties(ctx, &blob.GetPropertiesOptions{}) if err != nil { return 0, err } @@ -137,7 +169,7 @@ func (aos *AzureObjectStorage) StatObject(ctx context.Context, bucketName, objec } func (aos *AzureObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]int64, error) { - pager := aos.Client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{ + pager := aos.clients[bucketName].client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{ Prefix: &prefix, }) // pager := aos.Client.NewContainerClient(bucketName).NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ @@ -158,12 +190,12 @@ func (aos *AzureObjectStorage) ListObjects(ctx context.Context, bucketName strin } func (aos *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error { - _, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).Delete(ctx, &blob.DeleteOptions{}) + _, err := aos.clients[bucketName].client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).Delete(ctx, &blob.DeleteOptions{}) return err } func (aos *AzureObjectStorage) CopyObject(ctx context.Context, fromBucketName, toBucketName, fromPath, toPath string) error { - fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.config.accessKeyID, fromBucketName, fromPath) - _, err := aos.Client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil) + fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath) + _, err := aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil) return err } diff --git a/core/storage/chunk_manager.go b/core/storage/chunk_manager.go index cbd6158d..7f7b995a 100644 --- a/core/storage/chunk_manager.go +++ b/core/storage/chunk_manager.go @@ -51,12 +51,18 @@ func newAzureChunkManagerWithParams(ctx context.Context, params paramtable.Backu c.accessKeyID = params.MinioCfg.AccessKeyID c.secretAccessKeyID = params.MinioCfg.SecretAccessKey c.useSSL = params.MinioCfg.UseSSL - c.bucketName = params.MinioCfg.BackupBucketName + c.bucketName = params.MinioCfg.BucketName c.rootPath = params.MinioCfg.RootPath c.cloudProvider = params.MinioCfg.CloudProvider c.storageEngine = params.MinioCfg.StorageType c.useIAM = params.MinioCfg.UseIAM c.iamEndpoint = params.MinioCfg.IAMEndpoint c.createBucket = true + + c.backupAccessKeyID = params.MinioCfg.BackupAccessKeyID + c.backupSecretAccessKeyID = params.MinioCfg.BackupSecretAccessKey + c.backupBucketName = params.MinioCfg.BackupBucketName + c.backupRootPath = params.MinioCfg.BackupRootPath + return NewAzureChunkManager(ctx, c) } diff --git a/core/storage/options.go b/core/storage/options.go index 2b99a5db..ea1292f2 100644 --- a/core/storage/options.go +++ b/core/storage/options.go @@ -11,9 +11,14 @@ type config struct { rootPath string useIAM bool iamEndpoint string - + cloudProvider string storageEngine string + + backupAccessKeyID string + backupSecretAccessKeyID string + backupBucketName string + backupRootPath string } func newDefaultConfig() *config { diff --git a/core/storage/types.go b/core/storage/types.go index bfc62191..bce6b329 100644 --- a/core/storage/types.go +++ b/core/storage/types.go @@ -16,7 +16,8 @@ type FileReader interface { type ChunkManager interface { // RootPath returns current root path. // Useless in backup tool - RootPath() string + // RootPath() string + // Path returns path of @filePath. Path(ctx context.Context, bucketName string, filePath string) (string, error) // Size returns path of @filePath.