Skip to content

Commit

Permalink
Support azure (zilliztech#212)
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored and zhuwenxing committed Feb 21, 2024
1 parent fe7525b commit 8859b4b
Show file tree
Hide file tree
Showing 16 changed files with 784 additions and 55 deletions.
5 changes: 3 additions & 2 deletions configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ milvus:

# Related configuration of minio, which is responsible for data persistence for Milvus.
minio:
cloudProvider: "minio" # remote cloud storage provider: s3, gcp, aliyun, azure

address: localhost # Address of MinIO/S3
port: 9000 # Port of MinIO/S3
accessKeyID: minioadmin # accessKeyID of MinIO/S3
accessKeyID: minioadmin # accessKeyID of MinIO/S3
secretAccessKey: minioadmin # MinIO/S3 encryption string
useSSL: false # Access to MinIO/S3 with SSL
useIAM: false
cloudProvider: "aws"
iamEndpoint: ""

bucketName: "a-bucket" # Milvus Bucket name in MinIO/S3, make it the same as your milvus instance
Expand Down
13 changes: 1 addition & 12 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,7 @@ func CreateStorageClient(ctx context.Context, params paramtable.BackupParams) (s
zap.String("address", minioEndPoint),
zap.String("bucket", params.MinioCfg.BucketName),
zap.String("backupBucket", params.MinioCfg.BackupBucketName))
minioClient, err := storage.NewMinioChunkManager(ctx,
storage.Address(minioEndPoint),
storage.AccessKeyID(params.MinioCfg.AccessKeyID),
storage.SecretAccessKeyID(params.MinioCfg.SecretAccessKey),
storage.UseSSL(params.MinioCfg.UseSSL),
storage.BucketName(params.MinioCfg.BackupBucketName),
storage.RootPath(params.MinioCfg.RootPath),
storage.CloudProvider(params.MinioCfg.CloudProvider),
storage.UseIAM(params.MinioCfg.UseIAM),
storage.IAMEndpoint(params.MinioCfg.IAMEndpoint),
storage.CreateBucket(true),
)
minioClient, err := storage.NewChunkManager(ctx, params)
return minioClient, err
}

Expand Down
2 changes: 1 addition & 1 deletion core/backup_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestListBackups(t *testing.T) {
assert.Equal(t, backupLists.GetCode(), backuppb.ResponseCode_Success)

backupListsWithCollection := backupContext.ListBackups(context, &backuppb.ListBackupsRequest{
CollectionName: "hello_milvus",
//CollectionName: "hello_milvus",
})

for _, backup := range backupListsWithCollection.GetData() {
Expand Down
20 changes: 10 additions & 10 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,9 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
if strings.Contains(err.Error(), "index not found") ||
strings.HasPrefix(err.Error(), "index doesn't exist") {
// todo
log.Warn("field has no index",
log.Info("field has no index",
zap.String("collection_name", completeCollection.Name),
zap.String("field_name", field.Name),
zap.Error(err))
zap.String("field_name", field.Name))
continue
} else {
log.Error("fail in DescribeIndex", zap.Error(err))
Expand Down Expand Up @@ -729,30 +728,31 @@ func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.S
return nil
}

func (b *BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64, partitionID int64, segmentID int64, numOfRows int64) (*backuppb.SegmentBackupInfo, error) {
func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, numOfRows int64) (*backuppb.SegmentBackupInfo, error) {
segmentBackupInfo := backuppb.SegmentBackupInfo{
SegmentId: segmentID,
CollectionId: collecitonID,
CollectionId: collectionID,
PartitionId: partitionID,
NumOfRows: numOfRows,
}
var size int64 = 0
var rootPath string

if b.params.MinioCfg.RootPath != "" {
log.Debug("params.MinioCfg.RootPath", zap.String("params.MinioCfg.RootPath", b.params.MinioCfg.RootPath))
rootPath = fmt.Sprintf("%s/", b.params.MinioCfg.RootPath)
} else {
rootPath = ""
}

insertPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "insert_log", collecitonID, partitionID, segmentID)
log.Debug("insertPath", zap.String("insertPath", insertPath))
insertPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "insert_log", collectionID, partitionID, segmentID)
log.Debug("insertPath", zap.String("bucket", b.milvusBucketName), zap.String("insertPath", insertPath))
fieldsLogDir, _, err := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, insertPath, false)
if err != nil {
log.Error("Fail to list segment path", zap.String("insertPath", insertPath), zap.Error(err))
return &segmentBackupInfo, err
}
log.Debug("fieldsLogDir", zap.Any("fieldsLogDir", fieldsLogDir))
log.Debug("fieldsLogDir", zap.String("bucket", b.milvusBucketName), zap.Any("fieldsLogDir", fieldsLogDir))
insertLogs := make([]*backuppb.FieldBinlog, 0)
for _, fieldLogDir := range fieldsLogDir {
binlogPaths, sizes, _ := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, fieldLogDir, false)
Expand All @@ -772,7 +772,7 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64,
})
}

deltaLogPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "delta_log", collecitonID, partitionID, segmentID)
deltaLogPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "delta_log", collectionID, partitionID, segmentID)
deltaFieldsLogDir, _, _ := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, deltaLogPath, false)
deltaLogs := make([]*backuppb.FieldBinlog, 0)
for _, deltaFieldLogDir := range deltaFieldsLogDir {
Expand All @@ -798,7 +798,7 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64,
})
}

//statsLogPath := fmt.Sprintf("%s/%s/%v/%v/%v/", b.params.MinioCfg.RootPath, "stats_log", collecitonID, partitionID, segmentID)
//statsLogPath := fmt.Sprintf("%s/%s/%v/%v/%v/", b.params.MinioCfg.RootPath, "stats_log", collectionID, partitionID, segmentID)
//statsFieldsLogDir, _, _ := b.storageClient.ListWithPrefix(ctx, b.milvusBucketName, statsLogPath, false)
//statsLogs := make([]*backuppb.FieldBinlog, 0)
//for _, statsFieldLogDir := range statsFieldsLogDir {
Expand Down
2 changes: 2 additions & 0 deletions core/paramtable/base_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const (
DefaultMinioBackupBucketName = "a-bucket"
DefaultMinioBackupRootPath = "backup"

DefaultStorageType = "minio"

DefaultMilvusAddress = "localhost"
DefaultMilvusPort = "19530"
DefaultMilvusAuthorizationEnabled = "false"
Expand Down
21 changes: 17 additions & 4 deletions core/paramtable/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,19 @@ func (p *MilvusConfig) initTLSMode() {
// /////////////////////////////////////////////////////////////////////////////
// --- minio ---
const (
Minio = "minio"
CloudProviderAWS = "aws"
CloudProviderGCP = "gcp"
CloudProviderAliyun = "ali"
CloudProviderAzure = "azure"
)

var supportedCloudProvider = map[string]bool{
Minio: true,
CloudProviderAWS: true,
CloudProviderGCP: true,
CloudProviderAliyun: true,
CloudProviderAzure: true,
}

type MinioConfig struct {
Expand All @@ -141,11 +145,14 @@ type MinioConfig struct {

BackupBucketName string
BackupRootPath string

StorageType string
}

func (p *MinioConfig) init(base *BaseTable) {
p.Base = base

p.initStorageType()
p.initAddress()
p.initPort()
p.initAccessKeyID()
Expand All @@ -172,10 +179,7 @@ func (p *MinioConfig) initPort() {
}

func (p *MinioConfig) initAccessKeyID() {
keyID, err := p.Base.Load("minio.accessKeyID")
if err != nil {
panic(err)
}
keyID := p.Base.LoadWithDefault("minio.accessKeyID", DefaultMinioAccessKey)
p.AccessKeyID = keyID
}

Expand Down Expand Up @@ -230,6 +234,15 @@ func (p *MinioConfig) initBackupRootPath() {
p.BackupRootPath = rootPath
}

func (p *MinioConfig) initStorageType() {
engine := p.Base.LoadWithDefault("storage.type",
p.Base.LoadWithDefault("minio.type", DefaultStorageType))
if !supportedCloudProvider[engine] {
panic("unsupported storage type:" + engine)
}
p.StorageType = engine
}

type HTTPConfig struct {
Base *BaseTable

Expand Down
15 changes: 15 additions & 0 deletions core/paramtable/params_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package paramtable

import (
"testing"
)

func TestRootPathParams(t *testing.T) {
var params BackupParams
params.GlobalInitWithYaml("backup.yaml")
params.Init()

//cfg := &MinioConfig{}
//cfg.initRootPath()
println(params.MinioCfg.RootPath)
}
Loading

0 comments on commit 8859b4b

Please sign in to comment.