Skip to content

Commit

Permalink
Merge #2
Browse files Browse the repository at this point in the history
Fix Data Race
  • Loading branch information
melbahja authored Aug 7, 2020
2 parents eb5b343 + d25c6e8 commit 79f4172
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 119 deletions.
8 changes: 4 additions & 4 deletions chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type Chunk struct {

// Path name where this chunk downloaded.
Path string

Downloaded bool

Merged bool
}

// Download a chunk, and report to Progress, it returns error if any!
Expand Down Expand Up @@ -51,9 +55,5 @@ func (c *Chunk) Download(URL string, client *http.Client, dest *os.File) (err er

_, err = io.Copy(dest, io.TeeReader(res.Body, c.Progress))

if err == nil {
c.Path = dest.Name()
}

return err
}
212 changes: 118 additions & 94 deletions got.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,16 @@ type (

// Progress...
progress *Progress

// Chunk merge index.
index int

// Sync mutex.
mu sync.RWMutex
}
)


// Check Download and split file to chunks and set defaults,
// you should call Init first then call Start
func (d *Download) Init() error {
Expand All @@ -98,23 +105,23 @@ func (d *Download) Init() error {
}

// Init progress.
d.progress = new(Progress)
d.progress = &Progress{
mu: d.mu,
}

// Set default interval.
if d.Interval == 0 {
d.Interval = 20
}

// Get URL info.
d.Info, err = d.GetInfo()

if err != nil {
if d.Info, err = d.GetInfo(); err != nil {
return err
}

// Partial content not supported 😢!
if d.Info.Rangeable == false || d.Info.Length == 0 {
return err
return nil
}

// Set concurrency default to 10.
Expand All @@ -132,11 +139,6 @@ func (d *Download) Init() error {
d.ChunkSize = d.ChunkSize / 2
}

// Change ChunkSize if MaxChunkSize are set and ChunkSize > Max size
if d.MaxChunkSize > 0 && d.ChunkSize > d.MaxChunkSize {
d.ChunkSize = d.MaxChunkSize
}

// Set default min chunk size to 1m, or file size / 2
if d.MinChunkSize == 0 {

Expand All @@ -147,29 +149,39 @@ func (d *Download) Init() error {
}
}

// if Chunk size < Min size set chunk size to length / 2
// if Chunk size < Min size set chunk size to min.
if d.ChunkSize < d.MinChunkSize {
d.ChunkSize = d.MinChunkSize
}
}

// Avoid divide by zero
if d.ChunkSize > 0 {
chunksLen = d.Info.Length / d.ChunkSize
// Change ChunkSize if MaxChunkSize are set and ChunkSize > Max size
if d.MaxChunkSize > 0 && d.ChunkSize > d.MaxChunkSize {
d.ChunkSize = d.MaxChunkSize
}

} else if d.ChunkSize > d.Info.Length {

d.ChunkSize = d.Info.Length / 2
}

// Set chunks.
chunksLen = d.Info.Length / d.ChunkSize

// Set chunk ranges.
for ; i < chunksLen; i++ {

startRange = (d.ChunkSize * i) + 1
startRange = (d.ChunkSize * i) + i
endRange = startRange + d.ChunkSize

if i == 0 {

startRange = 0
}

endRange = startRange + d.ChunkSize
} else if d.chunks[i - 1].End == 0 {

if i == (chunksLen - 1) {
break
}

if endRange > d.Info.Length || i == (chunksLen - 1) {
endRange = 0
}

Expand Down Expand Up @@ -205,16 +217,43 @@ func (d *Download) Start() (err error) {
// Run progress func.
go d.progress.Run(d)

// Start download chunks.
go d.work(&errChan, &okChan)
// Partial content not supported,
// just download the file in one chunk.
if len(d.chunks) == 0 {

file, err := os.Create(d.Dest)

if err != nil {
return err
}

defer file.Close()

chunk := &Chunk{
Progress: d.progress,
}

return chunk.Download(d.URL, d.client, file)
}

// Download chunks.
go d.dl(&errChan)

// Merge chunks.
go d.merge(&errChan, &okChan)

// Wait for chunks...
for {

select {

case err := <-errChan:
return err

if err != nil {
return err
}

break

case <-okChan:

Expand Down Expand Up @@ -258,115 +297,100 @@ func (d *Download) GetInfo() (*Info, error) {
}, nil
}

// Download chunks and in same time merge them into dest path.
func (d *Download) work(echan *chan error, done *chan bool) {

var (
// Next chunk index.
next int = 0
// Merge downloaded chunks.
func (d *Download) merge(echan *chan error, done *chan bool) {

// Waiting group.
swg sync.WaitGroup

// Concurrency limit.
max chan int = make(chan int, d.Concurrency)

// Chunk file.
chunk *os.File
)

go func() {

chunksLen := len(d.chunks)

file, err := os.Create(d.Dest)
file, err := os.Create(d.Dest)

if err != nil {
*echan <- err
return
}

defer file.Close()

// Partial content not supported or file length is unknown,
// so just download it directly in one chunk!
if chunksLen == 0 {

chunk := &Chunk{
Progress: d.progress,
}

if err := chunk.Download(d.URL, d.client, file); err != nil {
*echan <- err
return
}
if err != nil {
*echan <-err
return
}

*done <- true
return
}
defer file.Close()

for {
chunksLen := len(d.chunks)

for i := 0; i < len(d.chunks); i++ {
for {

if next == i && d.chunks[i].Path != "" {
for i := range d.chunks {

chunk, err = os.Open(d.chunks[i].Path)
d.mu.RLock()
if d.chunks[i].Downloaded && d.chunks[i].Merged == false && i == d.index {

if err != nil {
chunk, err := os.Open(d.chunks[i].Path)

*echan <- err
return
}
if err != nil {
*echan <-err
return
}

// Copy chunk content to dest file.
_, err = io.Copy(file, chunk)
_, err = io.Copy(file, chunk)

// Close chunk fd.
chunk.Close()
if err != nil {
*echan <-err
return
}

if err != nil {
go chunk.Close()

*echan <- err
return
}
// Sync dest file.
file.Sync()

next++
}
d.chunks[i].Merged = true
d.index++
}
d.mu.RUnlock()

if next == len(d.chunks) {
// done, all chunks merged.
if d.index == chunksLen {
*done <- true
return
}

time.Sleep(6 * time.Millisecond)
}
}()
}
}


// Download chunks
func (d *Download) dl(echan *chan error) {

var (

// Waiting group.
swg sync.WaitGroup

// Concurrency limit.
max chan int = make(chan int, d.Concurrency)
)

for i := 0; i < len(d.chunks); i++ {

max <- 1
max <-1
swg.Add(1)

go func(i int) {

defer swg.Done()

chunk, err := ioutil.TempFile(d.temp, fmt.Sprintf("chunk-%d", i))
chunk, err := os.Create(fmt.Sprintf("%s/chunk-%d", d.temp, i))

if err != nil {
*echan <- err
*echan <-err
return
}

// Close chunk fd.
defer chunk.Close()

// Donwload the chunk.
if err = d.chunks[i].Download(d.URL, d.client, chunk); err != nil {
*echan <- err
}
// Donwload chunk.
*echan <-d.chunks[i].Download(d.URL, d.client, chunk)

d.mu.Lock()
d.chunks[i].Path = chunk.Name()
d.chunks[i].Downloaded = true
d.mu.Unlock()

<-max
}(i)
Expand Down
Loading

0 comments on commit 79f4172

Please sign in to comment.