Skip to content

Commit

Permalink
gateway: add rw locks to lock multiple requests within a single machi…
Browse files Browse the repository at this point in the history
…ne (#4845)
  • Loading branch information
zhijian-pro authored May 15, 2024
1 parent fe3ff53 commit b4aadd3
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ require (
xorm.io/builder v0.3.7 // indirect
)

replace github.com/minio/minio v0.0.0-20210206053228-97fe57bba92c => github.com/juicedata/minio v0.0.0-20240412074428-6fa8baf7e06c
replace github.com/minio/minio v0.0.0-20210206053228-97fe57bba92c => github.com/juicedata/minio v0.0.0-20240515031419-0bdee24fbb42

replace github.com/hanwen/go-fuse/v2 v2.1.1-0.20210611132105-24a1dfe6b4f8 => github.com/juicedata/go-fuse/v2 v2.1.1-0.20240425033113-7c40cb5eb3e9

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,8 @@ github.com/juicedata/gogfapi v0.0.0-20230626071140-fc28e5537825 h1:7KrwI4HPqvNLK
github.com/juicedata/gogfapi v0.0.0-20230626071140-fc28e5537825/go.mod h1:Ho5G4KgrgbMKW0buAJdOmYoJcOImkzznJQaLiATrsx4=
github.com/juicedata/huaweicloud-sdk-go-obs v3.22.12-0.20230228031208-386e87b5c091+incompatible h1:2/ttSmYoX+QMegpNyAJR0Y6aHcVk57F7RJit5xN2T/s=
github.com/juicedata/huaweicloud-sdk-go-obs v3.22.12-0.20230228031208-386e87b5c091+incompatible/go.mod h1:Ukwa8ffRQLV6QRwpqGioPjn2Wnf7TBDA4DbennDOqHE=
github.com/juicedata/minio v0.0.0-20240412074428-6fa8baf7e06c h1:31cU5yupOAL1ZtTz3iMjdVqJuB395EXy7jUSk27wqqo=
github.com/juicedata/minio v0.0.0-20240412074428-6fa8baf7e06c/go.mod h1:UOWyfa3ls1tnpJrNw2yzGqfrwM4nzsZq/qz+zd6H+/Q=
github.com/juicedata/minio v0.0.0-20240515031419-0bdee24fbb42 h1:WdtK8wG9L/Vwc8PNdfReq4Rf6NT6ZQDVw0+8Z9Cj5vk=
github.com/juicedata/minio v0.0.0-20240515031419-0bdee24fbb42/go.mod h1:UOWyfa3ls1tnpJrNw2yzGqfrwM4nzsZq/qz+zd6H+/Q=
github.com/juicedata/mpb/v7 v7.0.4-0.20231024073412-2b8d31be510b h1:0/6suPNZnrOlRlBaU/Bnitu8HiKkkLSzQhHbwQ9AysM=
github.com/juicedata/mpb/v7 v7.0.4-0.20231024073412-2b8d31be510b/go.mod h1:NXGsfPGx6G2JssqvEcULtDqUrxuuYs4llpv8W6ZUpzk=
github.com/juju/ratelimit v1.0.2 h1:sRxmtRiajbvrcLQT7S+JbqU0ntsb9W2yhSdNN8tWfaI=
Expand Down
49 changes: 42 additions & 7 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,9 +1166,10 @@ func (n *jfsObjects) cleanup() {
}

type jfsFLock struct {
inode meta.Ino
owner uint64
meta meta.Meta
inode meta.Ino
owner uint64
meta meta.Meta
localLock sync.RWMutex
}

func (j *jfsFLock) GetLock(ctx context.Context, timeout *minio.DynamicTimeout) (newCtx context.Context, timedOutErr error) {
Expand All @@ -1183,19 +1184,46 @@ func (j *jfsFLock) getFlockWithTimeOut(ctx context.Context, ltype uint32, timeou
start := time.Now()
deadline := start.Add(timeout.Timeout())
lockStr := "write"

var getLockFunc func() bool
var unlockFunc func()
var getLock bool
if ltype == meta.F_RDLCK {
getLockFunc = j.localLock.TryRLock
unlockFunc = j.localLock.RUnlock
lockStr = "read"
} else {
getLockFunc = j.localLock.TryLock
unlockFunc = j.localLock.Unlock
}

for {
if errno := j.meta.Flock(mctx, j.inode, j.owner, ltype, false); errno != 0 && !errors.Is(errno, syscall.EAGAIN) {
logger.Errorf("failed to get %s lock for inode %d by owner %d, error : %s", lockStr, j.inode, j.owner, errno)
getLock = getLockFunc()
if getLock {
break
}
if time.Now().After(deadline) {
timeout.LogFailure()
logger.Errorf("get %s lock timed out ino:%d", lockStr, j.inode)
return ctx, minio.OperationTimedOut{}
}
time.Sleep(5 * time.Millisecond)
}

for {
if errno := j.meta.Flock(mctx, j.inode, j.owner, ltype, false); errno != 0 {
if !errors.Is(errno, syscall.EAGAIN) {
logger.Errorf("failed to get %s lock for inode %d by owner %d, error : %s", lockStr, j.inode, j.owner, errno)
}
} else {
timeout.LogSuccess(time.Since(start))
return ctx, nil
}

if time.Now().After(deadline) {
unlockFunc()
timeout.LogFailure()
logger.Errorf("get write lock timed out ino:%d", j.inode)
logger.Errorf("get %s lock timed out ino:%d", lockStr, j.inode)
return ctx, minio.OperationTimedOut{}
}
time.Sleep(5 * time.Millisecond)
Expand All @@ -1209,14 +1237,21 @@ func (j *jfsFLock) Unlock() {
if errno := j.meta.Flock(mctx, j.inode, j.owner, meta.F_UNLCK, true); errno != 0 {
logger.Errorf("failed to release lock for inode %d by owner %d, error : %s", j.inode, j.owner, errno)
}
j.localLock.Unlock()
}

func (j *jfsFLock) GetRLock(ctx context.Context, timeout *minio.DynamicTimeout) (newCtx context.Context, timedOutErr error) {
return j.getFlockWithTimeOut(ctx, meta.F_RDLCK, timeout)
}

func (j *jfsFLock) RUnlock() {
j.Unlock()
if j.inode == 0 {
return
}
if errno := j.meta.Flock(mctx, j.inode, j.owner, meta.F_UNLCK, true); errno != 0 {
logger.Errorf("failed to release lock for inode %d by owner %d, error : %s", j.inode, j.owner, errno)
}
j.localLock.RUnlock()
}

func (n *jfsObjects) NewNSLock(bucket string, objects ...string) minio.RWLocker {
Expand Down
100 changes: 100 additions & 0 deletions pkg/gateway/gateway_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* JuiceFS, Copyright 2024 Juicedata, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package gateway

import (
"context"
"errors"
"github.com/juicedata/juicefs/pkg/chunk"
"github.com/juicedata/juicefs/pkg/fs"
"github.com/juicedata/juicefs/pkg/meta"
"github.com/juicedata/juicefs/pkg/object"
"github.com/juicedata/juicefs/pkg/vfs"
minio "github.com/minio/minio/cmd"
"os"
"testing"
"time"
)

func TestGatewayLock(t *testing.T) {
m := meta.NewClient("memkv://", nil)
format := &meta.Format{
Name: "test",
BlockSize: 4096,
Capacity: 1 << 30,
DirStats: true,
}
_ = m.Init(format, true)
var conf = vfs.Config{
Meta: meta.DefaultConf(),
Chunk: &chunk.Config{
BlockSize: format.BlockSize << 10,
MaxUpload: 1,
BufferSize: 100 << 20,
},
DirEntryTimeout: time.Millisecond * 100,
EntryTimeout: time.Millisecond * 100,
AttrTimeout: time.Millisecond * 100,
}
objStore, _ := object.CreateStorage("mem", "", "", "", "")
store := chunk.NewCachedStore(objStore, *conf.Chunk, nil)
jfs, err := fs.NewFileSystem(&conf, m, store)
if err != nil {
t.Fatalf("initialize failed: %s", err)
}
jfsObj := &jfsObjects{fs: jfs, conf: &conf, listPool: minio.NewTreeWalkPool(time.Minute * 30), gConf: &Config{Umask: 022}, nsMutex: minio.NewNSLock(false)}
mctx = meta.NewContext(uint32(os.Getpid()), uint32(os.Getuid()), []uint32{uint32(os.Getgid())})
if err := jfs.Mkdir(mctx, minio.MinioMetaBucket, 0777, 022); err != 0 {
t.Fatalf("mkdir failed: %s", err)
}

rwLocker := jfsObj.NewNSLock(minio.MinioMetaBucket, minio.MinioMetaLockFile)

if _, err := rwLocker.GetLock(context.Background(), minio.NewDynamicTimeout(2*time.Second, 1*time.Second)); err != nil {
t.Fatalf("get lock failed: %s", err)
}
if _, err := rwLocker.GetLock(context.Background(), minio.NewDynamicTimeout(2*time.Second, 1*time.Second)); !errors.As(err, &minio.OperationTimedOut{}) {
t.Fatalf("GetLock should return timeout error: %s", err)
}
rwLocker.Unlock()

if _, err := rwLocker.GetRLock(context.Background(), minio.NewDynamicTimeout(2*time.Second, 1*time.Second)); err != nil {
t.Fatalf("get lock failed: %s", err)
}
if _, err := rwLocker.GetRLock(context.Background(), minio.NewDynamicTimeout(2*time.Second, 1*time.Second)); err != nil {
t.Fatalf("GetRLock should return nil: %s", err)
}
rwLocker.RUnlock()
rwLocker.RUnlock()

if _, err := rwLocker.GetLock(context.Background(), minio.NewDynamicTimeout(2*time.Second, 1*time.Second)); err != nil {
t.Fatalf("get lock failed: %s", err)
}
if _, err := rwLocker.GetRLock(context.Background(), minio.NewDynamicTimeout(2*time.Second, 1*time.Second)); !errors.As(err, &minio.OperationTimedOut{}) {
t.Fatalf("GetRLock should return timeout error: %s", err)
}
rwLocker.Unlock()

if _, err := rwLocker.GetRLock(context.Background(), minio.NewDynamicTimeout(2*time.Second, 1*time.Second)); err != nil {
t.Fatalf("GetRLock failed: %s", err)
}
if _, err := rwLocker.GetLock(context.Background(), minio.NewDynamicTimeout(2*time.Second, 1*time.Second)); !errors.As(err, &minio.OperationTimedOut{}) {
t.Fatalf("GetRLock should return timeout error: %s", err)
}
rwLocker.RUnlock()

}

0 comments on commit b4aadd3

Please sign in to comment.