diff --git a/object.go b/object.go index 4274fa2..f32682b 100644 --- a/object.go +++ b/object.go @@ -78,8 +78,14 @@ func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOpti if opt != nil && opt.Listener != nil { if err == nil && resp != nil { - if totalBytes, e := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64); e == nil { - resp.Body = TeeReader(resp.Body, nil, totalBytes, opt.Listener) + if contentLength := resp.Header.Get("Content-Length"); contentLength != "" { + if totalBytes, e := strconv.ParseInt(contentLength, 10, 64); e == nil { + resp.Body = TeeReader(resp.Body, nil, totalBytes, opt.Listener) + } + } else { + // 支持multi download range下中间的chunk没有content-length的场景, + // multi process也不看这里的totalBytes + resp.Body = TeeReader(resp.Body, nil, 0, opt.Listener) } } } @@ -936,6 +942,143 @@ func SplitFileIntoChunks(filePath string, partSize int64) (int64, []Chunk, int, } +type multiDownloadProgress struct { + totalSize int64 + chunkProgress []*ProgressEvent + chunks int + listener ProgressListener + notify chan *chunkProgressEvent + stopc chan struct{} +} + +func newMultiDownloadProgress(listener ProgressListener, chunks int, totalSize int64) *multiDownloadProgress { + return &multiDownloadProgress{ + totalSize: totalSize, + chunkProgress: make([]*ProgressEvent, chunks), + chunks: chunks, + listener: listener, + notify: make(chan *chunkProgressEvent, chunks), + stopc: make(chan struct{}), + } +} + +func (p *multiDownloadProgress) ChunkListener(number int) ProgressListener { + return &chunkDownloadProgress{ + number: number, + notify: p.notify, + } +} + +func (p *multiDownloadProgress) Start() { + var ( + et ProgressEventType + consumed int64 + started bool + ) + for { + select { + case <-p.stopc: + return + case e := <-p.notify: + consumed += e.event.ConsumedBytes + if o := p.chunkProgress[e.number-1]; o != nil { + consumed -= o.ConsumedBytes + } + p.chunkProgress[e.number-1] = e.event + + // eventType聚合逻辑改为所有chunkjob都完成之后判定成功和失败 + // 因为就算提前给失败,Download function并不会终止执行 + // 用户捕获期间event中的Error即可不丢失数据 + if e.event.EventType == ProgressCompletedEvent || e.event.EventType == ProgressFailedEvent { + completed := 0 + failed := 0 + for _, c := range p.chunkProgress { + if c != nil { + if c.EventType == ProgressCompletedEvent { + completed += 1 + } else if c.EventType == ProgressFailedEvent { + failed += 1 + } + } + } + + if total := completed + failed; total < p.chunks { + if started { + et = ProgressDataEvent + } + } else if failed > 0 { + et = ProgressFailedEvent + } else { + et = ProgressCompletedEvent + } + } else if started && et == ProgressStartedEvent { + et = ProgressDataEvent + } + + progressCallback( + p.listener, + newProgressEvent( + et, + e.event.RWBytes, + consumed, + p.totalSize, + e.event.Err, + ), + ) + started = true + } + } +} + +func (p *multiDownloadProgress) Stop() { + close(p.stopc) +} + +func (p *multiDownloadProgress) ChunkFailed(res *Results, errs ...error) { + err := res.err + if len(errs) > 0 { + err = errs[0] + } + p.notify <- &chunkProgressEvent{ + number: res.PartNumber, + event: newProgressEvent( + ProgressFailedEvent, + 0, + 0, + // 因为totalSize不看chunk progress event所以可以填0 + 0, + err, + ), + } +} + +func (p *multiDownloadProgress) ChunkAlreadyDone(chunk Chunk) { + p.notify <- &chunkProgressEvent{ + number: chunk.Number, + event: newProgressEvent( + ProgressCompletedEvent, + 0, + chunk.Size, + chunk.Size, + ), + } +} + +type chunkDownloadProgress struct { + number int + notify chan *chunkProgressEvent + parent *multiDownloadProgress +} + +func (p *chunkDownloadProgress) ProgressChangedCallback(event *ProgressEvent) { + p.notify <- &chunkProgressEvent{p.number, event} +} + +type chunkProgressEvent struct { + number int + event *ProgressEvent +} + func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) { opt := &ObjectListUploadsOptions{ Prefix: name, @@ -1381,15 +1524,29 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri go downloadWorker(ctx, s, chjobs, chresults) } + var multiProgress *multiDownloadProgress + if opt.Opt != nil && opt.Opt.Listener != nil { + multiProgress = newMultiDownloadProgress( + opt.Opt.Listener, + len(chunks), + totalBytes, + ) + go multiProgress.Start() + } + go func() { for _, chunk := range chunks { if chunk.Done { + if multiProgress != nil { + multiProgress.ChunkAlreadyDone(chunk) + } continue } var downOpt ObjectGetOptions if opt.Opt != nil { downOpt = *opt.Opt - downOpt.Listener = nil // listener need to set nil + // downOpt.Listener = nil // listener need to set nil + downOpt.Listener = multiProgress.ChunkListener(chunk.Number) } job := &Jobs{ Name: name, @@ -1413,8 +1570,16 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri res := <-chresults if res.Resp == nil || res.err != nil { err = fmt.Errorf("part %d get resp Content. error: %s", res.PartNumber, res.err.Error()) + if multiProgress != nil { + multiProgress.ChunkFailed(res, err) + } continue } + + // 没有complete事件的chunk在这里补偿一下 + if multiProgress != nil { + multiProgress.ChunkAlreadyDone(chunks[res.PartNumber-1]) + } // Dump CheckPoint Info if opt.CheckPoint { cpfd.Truncate(0) @@ -1430,6 +1595,9 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri if cpfd != nil { cpfd.Close() } + if multiProgress != nil { + multiProgress.Stop() + } if err != nil { return nil, err } diff --git a/object_test.go b/object_test.go index 179eab3..9cffca3 100644 --- a/object_test.go +++ b/object_test.go @@ -1009,6 +1009,122 @@ func TestObjectService_DownloadWithCheckPoint(t *testing.T) { } } +type fullProgressListener struct { +} + +func (l *fullProgressListener) ProgressChangedCallback(event *ProgressEvent) { + switch event.EventType { + case ProgressStartedEvent: + fmt.Printf("Transfer Start [ConsumedBytes/TotalBytes: %d/%d]\n", + event.ConsumedBytes, event.TotalBytes) + case ProgressDataEvent: + fmt.Printf("Transfer Data [ConsumedBytes/TotalBytes: %d/%d, %d%%] [Err: %v]\n", + event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes, event.Err) + case ProgressCompletedEvent: + fmt.Printf("\nTransfer Complete [ConsumedBytes/TotalBytes: %d/%d]\n", + event.ConsumedBytes, event.TotalBytes) + case ProgressFailedEvent: + fmt.Printf("\nTransfer Failed [ConsumedBytes/TotalBytes: %d/%d] [Err: %v]\n", + event.ConsumedBytes, event.TotalBytes, event.Err) + default: + fmt.Printf("Progress Changed Error: unknown progress event type\n") + } +} + +func TestObjectService_DownloadProgressWithCheckPoint(t *testing.T) { + setup() + defer teardown() + + filePath := "rsp.file" + time.Now().Format(time.RFC3339) + newfile, err := os.Create(filePath) + if err != nil { + t.Fatalf("create tmp file failed") + } + defer os.Remove(filePath) + // 源文件内容 + totalBytes := int64(1024*1024*9 + 123) + partSize := 1024 * 1024 + b := make([]byte, totalBytes) + _, err = rand.Read(b) + newfile.Write(b) + newfile.Close() + tb := crc64.MakeTable(crc64.ECMA) + localcrc := strconv.FormatUint(crc64.Update(0, tb, b), 10) + + oddok := false + var oddcount, evencount int + mux.HandleFunc("/test.go.download", func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodHead { + w.Header().Add("Content-Length", strconv.FormatInt(totalBytes, 10)) + w.Header().Add("x-cos-hash-crc64ecma", localcrc) + return + } + strRange := r.Header.Get("Range") + slice1 := strings.Split(strRange, "=") + slice2 := strings.Split(slice1[1], "-") + start, _ := strconv.ParseInt(slice2[0], 10, 64) + end, _ := strconv.ParseInt(slice2[1], 10, 64) + if (start/int64(partSize))%2 == 1 { + if oddok { + io.Copy(w, bytes.NewBuffer(b[start:end+1])) + } else { + // 数据校验失败, Download做3次重试 + io.Copy(w, bytes.NewBuffer(b[start:end])) + } + oddcount++ + } else { + io.Copy(w, bytes.NewBuffer(b[start:end+1])) + evencount++ + } + }) + + opt := &MultiDownloadOptions{ + Opt: &ObjectGetOptions{ + Listener: &fullProgressListener{}, + }, + ThreadPoolSize: 3, + PartSize: 1, + CheckPoint: true, + } + downPath := "down.file" + time.Now().Format(time.RFC3339) + defer os.Remove(downPath) + _, err = client.Object.Download(context.Background(), "test.go.download", downPath, opt) + if err == nil { + // 偶数块下载完成,奇数块下载失败 + t.Fatalf("Object.Download returned error: %v", err) + } + fd, err := os.Open(downPath) + if err != nil { + t.Fatalf("Object Download Open File Failed:%v", err) + } + offset := 0 + for i := 0; i < 10; i++ { + bs, _ := ioutil.ReadAll(io.LimitReader(fd, int64(partSize))) + offset += len(bs) + if i%2 == 1 { + bs[len(bs)-1] = b[offset-1] + } + if bytes.Compare(bs, b[i*partSize:offset]) != 0 { + t.Fatalf("Compare Error, index:%v, len:%v, offset:%v", i, len(bs), offset) + } + } + fd.Close() + + if oddcount != 15 || evencount != 5 { + t.Fatalf("Object.Download failed, odd:%v, even:%v", oddcount, evencount) + } + // 设置奇数块OK + oddok = true + _, err = client.Object.Download(context.Background(), "test.go.download", downPath, opt) + if err != nil { + // 下载成功 + t.Fatalf("Object.Download returned error: %v", err) + } + if oddcount != 20 || evencount != 5 { + t.Fatalf("Object.Download failed, odd:%v, even:%v", oddcount, evencount) + } +} + func TestObjectService_GetTagging(t *testing.T) { setup() defer teardown() diff --git a/progress.go b/progress.go index 3fae1fe..a893612 100644 --- a/progress.go +++ b/progress.go @@ -71,6 +71,7 @@ func (r *teeReader) Read(p []byte) (int, error) { event := newProgressEvent(ProgressFailedEvent, 0, r.consumedBytes, r.totalBytes, err) progressCallback(r.listener, event) } + if n > 0 { r.consumedBytes += int64(n) if r.writer != nil {