Skip to content

Commit

Permalink
cmd/sync: limit the maximum memory size for multiple upload (#4135)
Browse files Browse the repository at this point in the history
close #4108
limit the maximum memory size for multiple upload is 32MB times the number of threads

---------

Co-authored-by: Davies Liu <[email protected]>
  • Loading branch information
zhijian-pro and davies authored Nov 2, 2023
1 parent b72106c commit 66ebdb1
Showing 1 changed file with 29 additions and 3 deletions.
32 changes: 29 additions & 3 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync"
"time"

"github.com/juicedata/juicefs/pkg/chunk"
"github.com/juicedata/juicefs/pkg/object"
"github.com/juicedata/juicefs/pkg/utils"
"github.com/juju/ratelimit"
Expand Down Expand Up @@ -177,6 +178,13 @@ var bufPool = sync.Pool{
},
}

var (
// for multiple upload mem limit
availMem int64
mutex sync.Mutex
cond = sync.NewCond(&mutex)
)

func try(n int, f func() error) (err error) {
for i := 0; i < n; i++ {
err = f()
Expand Down Expand Up @@ -429,15 +437,27 @@ func doCopyMultiple(src, dst object.ObjectStorage, key string, size int64, uploa
abort := make(chan struct{})
parts := make([]*object.Part, n)
errs := make(chan error, n)

for i := 0; i < n; i++ {
go func(num int) {
sz := partSize
if num == n-1 {
sz = size - int64(num)*partSize
}
if limiter != nil {
limiter.Wait(sz)

mutex.Lock()
for availMem < 0 {
cond.Wait()
}
availMem -= partSize
mutex.Unlock()
defer func() {
mutex.Lock()
availMem += partSize
mutex.Unlock()
cond.Signal()
}()

select {
case <-abort:
errs <- fmt.Errorf("aborted")
Expand All @@ -448,7 +468,12 @@ func doCopyMultiple(src, dst object.ObjectStorage, key string, size int64, uploa
}()
}

data := make([]byte, sz)
if limiter != nil {
limiter.Wait(sz)
}
p := chunk.NewOffPage(int(sz))
defer p.Release()
data := p.Data
if err := try(3, func() error {
in, err := src.Get(key, int64(num)*partSize, sz)
if err != nil {
Expand Down Expand Up @@ -1002,6 +1027,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error {
if config.Manager != "" {
bufferSize = 100
}
availMem = int64(config.Threads * 32 << 20)
tasks := make(chan object.Object, bufferSize)
wg := sync.WaitGroup{}
concurrent = make(chan int, config.Threads)
Expand Down

0 comments on commit 66ebdb1

Please sign in to comment.