Skip to content
This repository has been archived by the owner on Nov 13, 2024. It is now read-only.

Commit

Permalink
storage: impl local copy process record
Browse files Browse the repository at this point in the history
Signed-off-by: huanghaoyuan <[email protected]>
  • Loading branch information
huanghaoyuanhhy committed Nov 7, 2023
1 parent 0a141e8 commit 0562a87
Showing 1 changed file with 50 additions and 14 deletions.
64 changes: 50 additions & 14 deletions storage/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"sync/atomic"

"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -59,6 +60,12 @@ type Copier struct {
bufSizeBytePerWorker int
useRemoteCopy bool
rps int32

totalSize atomic.Uint64
totalCnt atomic.Uint64

size atomic.Uint64
cnt atomic.Uint64
}

func NewCopier(src, dest Client, opt CopyOption) *Copier {
Expand All @@ -81,9 +88,22 @@ func NewCopier(src, dest Client, opt CopyOption) *Copier {
}
}

type Total struct {
Length int64
Count int64
type Process struct {
TotalSize uint64
TotalCnt uint64

Size uint64
Cnt uint64
}

func (c *Copier) Process() Process {
return Process{
TotalSize: c.totalSize.Load(),
TotalCnt: c.totalCnt.Load(),

Size: c.size.Load(),
Cnt: c.cnt.Load(),
}
}

type CopyPathInput struct {
Expand All @@ -95,36 +115,35 @@ type CopyPathInput struct {

// OnSuccess when an object copy success, this func will be call
// May be executed concurrently, please pay attention to thread safety
OnSuccess func(attr ObjectAttr, total Total)
OnSuccess func(attr ObjectAttr)
}

// getAttrs get all attrs under bucket/prefix
func (c *Copier) getAttrs(ctx context.Context, bucket, prefix string) ([]ObjectAttr, Total, error) {
func (c *Copier) getAttrs(ctx context.Context, bucket, prefix string) ([]ObjectAttr, error) {
var attrs []ObjectAttr
var length int64
var count int64

p := c.src.ListObjectsPage(ctx, ListObjectPageInput{Bucket: bucket, Prefix: prefix})
for p.HasMorePages() {
page, err := p.NextPage(ctx)
if err != nil {
return nil, Total{}, fmt.Errorf("storage: copier list objects %w", err)
return nil, fmt.Errorf("storage: copier list objects %w", err)
}
for _, attr := range page.Contents {
if attr.IsEmpty() {
continue
}
attrs = append(attrs, attr)
length += attr.Length
count++
c.totalSize.Add(uint64(attr.Length))
c.cnt.Add(1)
}
}

return attrs, Total{Length: length, Count: count}, nil
return attrs, nil
}

// CopyPrefix Copy all files under src path
func (c *Copier) CopyPrefix(ctx context.Context, i CopyPathInput) error {
srcAttrs, total, err := c.getAttrs(ctx, i.SrcBucket, i.SrcPrefix)
srcAttrs, err := c.getAttrs(ctx, i.SrcBucket, i.SrcPrefix)
if err != nil {
return fmt.Errorf("storage: copier get src attrs %w", err)
}
Expand All @@ -146,8 +165,10 @@ func (c *Copier) CopyPrefix(ctx context.Context, i CopyPathInput) error {
}
}
if i.OnSuccess != nil {
i.OnSuccess(attr, total)
i.OnSuccess(attr)
}
c.cnt.Add(1)

return nil
}

Expand Down Expand Up @@ -187,7 +208,7 @@ func (c *Copier) copyLocal(ctx context.Context, attr ObjectAttr, destKey, srcBuc
}
defer obj.Body.Close()

var body io.Reader = bufio.NewReaderSize(obj.Body, c.bufSizeBytePerWorker)
body := c.newProcessReader(bufio.NewReaderSize(obj.Body, c.bufSizeBytePerWorker))
if c.lim != nil {
body = &limReader{r: body, lim: c.lim, ctx: ctx}
}
Expand All @@ -198,3 +219,18 @@ func (c *Copier) copyLocal(ctx context.Context, attr ObjectAttr, destKey, srcBuc

return nil
}

type processReader struct {
src io.Reader
len *atomic.Uint64
}

func (r *processReader) Read(p []byte) (int, error) {
n, err := r.src.Read(p)
r.len.Add(uint64(n))
return n, err
}

func (c *Copier) newProcessReader(src io.Reader) io.Reader {
return &processReader{src: src, len: &c.size}
}

0 comments on commit 0562a87

Please sign in to comment.