Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

分片下载支持ProgressEvent #179

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 171 additions & 3 deletions object.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,14 @@ func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOpti

if opt != nil && opt.Listener != nil {
if err == nil && resp != nil {
if totalBytes, e := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64); e == nil {
resp.Body = TeeReader(resp.Body, nil, totalBytes, opt.Listener)
if contentLength := resp.Header.Get("Content-Length"); contentLength != "" {
if totalBytes, e := strconv.ParseInt(contentLength, 10, 64); e == nil {
resp.Body = TeeReader(resp.Body, nil, totalBytes, opt.Listener)
}
} else {
// 支持multi download range下中间的chunk没有content-length的场景,
// multi process也不看这里的totalBytes
resp.Body = TeeReader(resp.Body, nil, 0, opt.Listener)
}
}
}
Expand Down Expand Up @@ -936,6 +942,143 @@ func SplitFileIntoChunks(filePath string, partSize int64) (int64, []Chunk, int,

}

type multiDownloadProgress struct {
totalSize int64
chunkProgress []*ProgressEvent
chunks int
listener ProgressListener
notify chan *chunkProgressEvent
stopc chan struct{}
}

func newMultiDownloadProgress(listener ProgressListener, chunks int, totalSize int64) *multiDownloadProgress {
return &multiDownloadProgress{
totalSize: totalSize,
chunkProgress: make([]*ProgressEvent, chunks),
chunks: chunks,
listener: listener,
notify: make(chan *chunkProgressEvent, chunks),
stopc: make(chan struct{}),
}
}

func (p *multiDownloadProgress) ChunkListener(number int) ProgressListener {
return &chunkDownloadProgress{
number: number,
notify: p.notify,
}
}

func (p *multiDownloadProgress) Start() {
var (
et ProgressEventType
consumed int64
started bool
)
for {
select {
case <-p.stopc:
return
case e := <-p.notify:
consumed += e.event.ConsumedBytes
if o := p.chunkProgress[e.number-1]; o != nil {
consumed -= o.ConsumedBytes
}
p.chunkProgress[e.number-1] = e.event

// eventType聚合逻辑改为所有chunkjob都完成之后判定成功和失败
// 因为就算提前给失败,Download function并不会终止执行
// 用户捕获期间event中的Error即可不丢失数据
if e.event.EventType == ProgressCompletedEvent || e.event.EventType == ProgressFailedEvent {
completed := 0
failed := 0
for _, c := range p.chunkProgress {
if c != nil {
if c.EventType == ProgressCompletedEvent {
completed += 1
} else if c.EventType == ProgressFailedEvent {
failed += 1
}
}
}

if total := completed + failed; total < p.chunks {
if started {
et = ProgressDataEvent
}
} else if failed > 0 {
et = ProgressFailedEvent
} else {
et = ProgressCompletedEvent
}
} else if started && et == ProgressStartedEvent {
et = ProgressDataEvent
}

progressCallback(
p.listener,
newProgressEvent(
et,
e.event.RWBytes,
consumed,
p.totalSize,
e.event.Err,
),
)
started = true
}
}
}

func (p *multiDownloadProgress) Stop() {
close(p.stopc)
}

func (p *multiDownloadProgress) ChunkFailed(res *Results, errs ...error) {
err := res.err
if len(errs) > 0 {
err = errs[0]
}
p.notify <- &chunkProgressEvent{
number: res.PartNumber,
event: newProgressEvent(
ProgressFailedEvent,
0,
0,
// 因为totalSize不看chunk progress event所以可以填0
0,
err,
),
}
}

func (p *multiDownloadProgress) ChunkAlreadyDone(chunk Chunk) {
p.notify <- &chunkProgressEvent{
number: chunk.Number,
event: newProgressEvent(
ProgressCompletedEvent,
0,
chunk.Size,
chunk.Size,
),
}
}

type chunkDownloadProgress struct {
number int
notify chan *chunkProgressEvent
parent *multiDownloadProgress
}

func (p *chunkDownloadProgress) ProgressChangedCallback(event *ProgressEvent) {
p.notify <- &chunkProgressEvent{p.number, event}
}

type chunkProgressEvent struct {
number int
event *ProgressEvent
}

func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) {
opt := &ObjectListUploadsOptions{
Prefix: name,
Expand Down Expand Up @@ -1381,15 +1524,29 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
go downloadWorker(ctx, s, chjobs, chresults)
}

var multiProgress *multiDownloadProgress
if opt.Opt != nil && opt.Opt.Listener != nil {
multiProgress = newMultiDownloadProgress(
opt.Opt.Listener,
len(chunks),
totalBytes,
)
go multiProgress.Start()
}

go func() {
for _, chunk := range chunks {
if chunk.Done {
if multiProgress != nil {
multiProgress.ChunkAlreadyDone(chunk)
}
continue
}
var downOpt ObjectGetOptions
if opt.Opt != nil {
downOpt = *opt.Opt
downOpt.Listener = nil // listener need to set nil
// downOpt.Listener = nil // listener need to set nil
downOpt.Listener = multiProgress.ChunkListener(chunk.Number)
}
job := &Jobs{
Name: name,
Expand All @@ -1413,8 +1570,16 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
res := <-chresults
if res.Resp == nil || res.err != nil {
err = fmt.Errorf("part %d get resp Content. error: %s", res.PartNumber, res.err.Error())
if multiProgress != nil {
multiProgress.ChunkFailed(res, err)
}
continue
}

// 没有complete事件的chunk在这里补偿一下
if multiProgress != nil {
multiProgress.ChunkAlreadyDone(chunks[res.PartNumber-1])
}
// Dump CheckPoint Info
if opt.CheckPoint {
cpfd.Truncate(0)
Expand All @@ -1430,6 +1595,9 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
if cpfd != nil {
cpfd.Close()
}
if multiProgress != nil {
multiProgress.Stop()
}
if err != nil {
return nil, err
}
Expand Down
116 changes: 116 additions & 0 deletions object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,122 @@ func TestObjectService_DownloadWithCheckPoint(t *testing.T) {
}
}

type fullProgressListener struct {
}

func (l *fullProgressListener) ProgressChangedCallback(event *ProgressEvent) {
switch event.EventType {
case ProgressStartedEvent:
fmt.Printf("Transfer Start [ConsumedBytes/TotalBytes: %d/%d]\n",
event.ConsumedBytes, event.TotalBytes)
case ProgressDataEvent:
fmt.Printf("Transfer Data [ConsumedBytes/TotalBytes: %d/%d, %d%%] [Err: %v]\n",
event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes, event.Err)
case ProgressCompletedEvent:
fmt.Printf("\nTransfer Complete [ConsumedBytes/TotalBytes: %d/%d]\n",
event.ConsumedBytes, event.TotalBytes)
case ProgressFailedEvent:
fmt.Printf("\nTransfer Failed [ConsumedBytes/TotalBytes: %d/%d] [Err: %v]\n",
event.ConsumedBytes, event.TotalBytes, event.Err)
default:
fmt.Printf("Progress Changed Error: unknown progress event type\n")
}
}

func TestObjectService_DownloadProgressWithCheckPoint(t *testing.T) {
setup()
defer teardown()

filePath := "rsp.file" + time.Now().Format(time.RFC3339)
newfile, err := os.Create(filePath)
if err != nil {
t.Fatalf("create tmp file failed")
}
defer os.Remove(filePath)
// 源文件内容
totalBytes := int64(1024*1024*9 + 123)
partSize := 1024 * 1024
b := make([]byte, totalBytes)
_, err = rand.Read(b)
newfile.Write(b)
newfile.Close()
tb := crc64.MakeTable(crc64.ECMA)
localcrc := strconv.FormatUint(crc64.Update(0, tb, b), 10)

oddok := false
var oddcount, evencount int
mux.HandleFunc("/test.go.download", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodHead {
w.Header().Add("Content-Length", strconv.FormatInt(totalBytes, 10))
w.Header().Add("x-cos-hash-crc64ecma", localcrc)
return
}
strRange := r.Header.Get("Range")
slice1 := strings.Split(strRange, "=")
slice2 := strings.Split(slice1[1], "-")
start, _ := strconv.ParseInt(slice2[0], 10, 64)
end, _ := strconv.ParseInt(slice2[1], 10, 64)
if (start/int64(partSize))%2 == 1 {
if oddok {
io.Copy(w, bytes.NewBuffer(b[start:end+1]))
} else {
// 数据校验失败, Download做3次重试
io.Copy(w, bytes.NewBuffer(b[start:end]))
}
oddcount++
} else {
io.Copy(w, bytes.NewBuffer(b[start:end+1]))
evencount++
}
})

opt := &MultiDownloadOptions{
Opt: &ObjectGetOptions{
Listener: &fullProgressListener{},
},
ThreadPoolSize: 3,
PartSize: 1,
CheckPoint: true,
}
downPath := "down.file" + time.Now().Format(time.RFC3339)
defer os.Remove(downPath)
_, err = client.Object.Download(context.Background(), "test.go.download", downPath, opt)
if err == nil {
// 偶数块下载完成,奇数块下载失败
t.Fatalf("Object.Download returned error: %v", err)
}
fd, err := os.Open(downPath)
if err != nil {
t.Fatalf("Object Download Open File Failed:%v", err)
}
offset := 0
for i := 0; i < 10; i++ {
bs, _ := ioutil.ReadAll(io.LimitReader(fd, int64(partSize)))
offset += len(bs)
if i%2 == 1 {
bs[len(bs)-1] = b[offset-1]
}
if bytes.Compare(bs, b[i*partSize:offset]) != 0 {
t.Fatalf("Compare Error, index:%v, len:%v, offset:%v", i, len(bs), offset)
}
}
fd.Close()

if oddcount != 15 || evencount != 5 {
t.Fatalf("Object.Download failed, odd:%v, even:%v", oddcount, evencount)
}
// 设置奇数块OK
oddok = true
_, err = client.Object.Download(context.Background(), "test.go.download", downPath, opt)
if err != nil {
// 下载成功
t.Fatalf("Object.Download returned error: %v", err)
}
if oddcount != 20 || evencount != 5 {
t.Fatalf("Object.Download failed, odd:%v, even:%v", oddcount, evencount)
}
}

func TestObjectService_GetTagging(t *testing.T) {
setup()
defer teardown()
Expand Down
1 change: 1 addition & 0 deletions progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (r *teeReader) Read(p []byte) (int, error) {
event := newProgressEvent(ProgressFailedEvent, 0, r.consumedBytes, r.totalBytes, err)
progressCallback(r.listener, event)
}

if n > 0 {
r.consumedBytes += int64(n)
if r.writer != nil {
Expand Down