Skip to content
This repository has been archived by the owner on Nov 13, 2024. It is now read-only.

Commit

Permalink
storage: support azure ad auth remote copy
Browse files Browse the repository at this point in the history
Signed-off-by: huanghaoyuan <[email protected]>
  • Loading branch information
huanghaoyuanhhy committed Oct 24, 2023
1 parent 6581e9f commit 6f4c40c
Showing 1 changed file with 61 additions and 4 deletions.
65 changes: 61 additions & 4 deletions storage/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,34 @@ package storage
import (
"context"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"os"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
)

type AzureClient struct {
account string

cli *azblob.Client

useIAM bool

// sasCli is used to generate SAS token.
// When we want to copy object under two different service accounts, AD auth is not supported.
// So we need to use AD auth to generate SAS token and use SAS token to copy object.
sasCli *service.Client
}

func NewAzureClient(cfg Cfg) (*AzureClient, error) {
endpoint := fmt.Sprintf("https://%s.blob.core.windows.net", cfg.AK)
var cli *azblob.Client
var sasCli *service.Client
if cfg.UseIAM {
cred, err := azidentity.NewWorkloadIdentityCredential(&azidentity.WorkloadIdentityCredentialOptions{
ClientID: os.Getenv("AZURE_CLIENT_ID"),
Expand All @@ -32,6 +44,10 @@ func NewAzureClient(cfg Cfg) (*AzureClient, error) {
if err != nil {
return nil, fmt.Errorf("storage: new azure client %w", err)
}
sasCli, err = service.NewClient(endpoint, cred, nil)
if err != nil {
return nil, fmt.Errorf("storage: new azure service client %w", err)
}
} else {
cred, err := azblob.NewSharedKeyCredential(cfg.AK, cfg.SK)
if err != nil {
Expand All @@ -41,21 +57,32 @@ func NewAzureClient(cfg Cfg) (*AzureClient, error) {
if err != nil {
return nil, fmt.Errorf("storage: new azure client %w", err)
}
// sasCli is not used when use shared key auth
}

return &AzureClient{account: cfg.AK, cli: cli}, nil
return &AzureClient{account: cfg.AK, useIAM: cfg.UseIAM, cli: cli, sasCli: sasCli}, nil
}

func (a *AzureClient) CopyObject(ctx context.Context, i CopyObjectInput) error {
srcCli, ok := i.SrcCli.(*AzureClient)
if !ok {
return fmt.Errorf("storage: azure copy object dest client is not azure client")
}
var url string
// When we want to copy object under two different service accounts, AD auth is not supported.
if srcCli.useIAM && (srcCli.account != a.account) {
queryParam, err := a.getSAS(i)
if err != nil {
return fmt.Errorf("storage: azure get sas %w", err)
}
url = fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", srcCli.account, i.SrcBucket, i.SrcKey, queryParam.Encode())
} else {
url = fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", srcCli.account, i.SrcBucket, i.SrcKey)
}

url := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", srcCli.account, i.SrcBucket, i.SrcKey)
_, err := a.cli.ServiceClient().
NewContainerClient(i.DestBucket).
NewBlobClient(i.DestKey).
NewBlockBlobClient(i.DestKey).
StartCopyFromURL(ctx, url, nil)
if err != nil {
return fmt.Errorf("storage: azure start copy from url %w", err)
Expand All @@ -64,6 +91,36 @@ func (a *AzureClient) CopyObject(ctx context.Context, i CopyObjectInput) error {
return nil
}

func (a *AzureClient) getSAS(i CopyObjectInput) (sas.QueryParameters, error) {
srcCli, ok := i.SrcCli.(*AzureClient)
if !ok {
return sas.QueryParameters{}, fmt.Errorf("storage: azure copy object dest client is not azure client")
}

now := time.Now().UTC().Add(-10 * time.Second)
expiry := now.Add(48 * time.Hour)
info := service.KeyInfo{
Start: to.Ptr(now.UTC().Format(sas.TimeFormat)),
Expiry: to.Ptr(expiry.UTC().Format(sas.TimeFormat)),
}
udc, err := srcCli.sasCli.GetUserDelegationCredential(context.Background(), info, nil)
if err != nil {
return sas.QueryParameters{}, fmt.Errorf("storage: azure get user delegation credential %w", err)
}
sasQueryParams, err := sas.BlobSignatureValues{
Protocol: sas.ProtocolHTTPS,
StartTime: time.Now().UTC().Add(time.Second * -10),
ExpiryTime: time.Now().UTC().Add(60 * time.Minute),
Permissions: to.Ptr(sas.ContainerPermissions{Read: true, List: true}).String(),
ContainerName: i.SrcBucket,
}.SignWithUserDelegation(udc)
if err != nil {
return sas.QueryParameters{}, fmt.Errorf("storage: azure sign with user delegation %w", err)
}

return sasQueryParams, nil
}

func (a *AzureClient) HeadBucket(ctx context.Context, bucket string) error {
page := a.cli.NewListContainersPager(&azblob.ListContainersOptions{Prefix: &bucket})
for page.More() {
Expand Down

0 comments on commit 6f4c40c

Please sign in to comment.