diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index da737f274aff..c6c8e6b83579 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -28,6 +28,7 @@ import ( "sync" "time" + "github.com/juicedata/juicefs/pkg/chunk" "github.com/juicedata/juicefs/pkg/object" "github.com/juicedata/juicefs/pkg/utils" "github.com/juju/ratelimit" @@ -177,6 +178,13 @@ var bufPool = sync.Pool{ }, } +var ( + // for multiple upload mem limit + availMem int64 + mutex sync.Mutex + cond = sync.NewCond(&mutex) +) + func try(n int, f func() error) (err error) { for i := 0; i < n; i++ { err = f() @@ -429,15 +437,27 @@ func doCopyMultiple(src, dst object.ObjectStorage, key string, size int64, uploa abort := make(chan struct{}) parts := make([]*object.Part, n) errs := make(chan error, n) + for i := 0; i < n; i++ { go func(num int) { sz := partSize if num == n-1 { sz = size - int64(num)*partSize } - if limiter != nil { - limiter.Wait(sz) + + mutex.Lock() + for availMem < 0 { + cond.Wait() } + availMem -= partSize + mutex.Unlock() + defer func() { + mutex.Lock() + availMem += partSize + mutex.Unlock() + cond.Signal() + }() + select { case <-abort: errs <- fmt.Errorf("aborted") @@ -448,7 +468,12 @@ func doCopyMultiple(src, dst object.ObjectStorage, key string, size int64, uploa }() } - data := make([]byte, sz) + if limiter != nil { + limiter.Wait(sz) + } + p := chunk.NewOffPage(int(sz)) + defer p.Release() + data := p.Data if err := try(3, func() error { in, err := src.Get(key, int64(num)*partSize, sz) if err != nil { @@ -1002,6 +1027,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error { if config.Manager != "" { bufferSize = 100 } + availMem = int64(config.Threads * 32 << 20) tasks := make(chan object.Object, bufferSize) wg := sync.WaitGroup{} concurrent = make(chan int, config.Threads)