Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gateway: add one more hierarchy to upload tmp/multiupload dir to reduce txn conflicts #5549

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 77 additions & 37 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ import (
)

const (
sep = "/"
metaBucket = ".sys"
sep = "/"
metaBucket = ".sys"
subDirPrefix = 3 // 16^3=4096 slots
)

var mctx meta.Context
Expand Down Expand Up @@ -186,10 +187,14 @@ func (n *jfsObjects) tpath(p ...string) string {
}

func (n *jfsObjects) upath(bucket, uploadID string) string {
return n.tpath(bucket, "uploads", uploadID)
return n.tpath(bucket, "uploads", uploadID[:subDirPrefix], uploadID)
}

func (n *jfsObjects) ppath(bucket, uploadID, part string) string {
return n.tpath(bucket, "uploads", uploadID[:subDirPrefix], uploadID, part)
}

func (n *jfsObjects) ppathFlat(bucket, uploadID, part string) string { // compatible with tmp files uploaded by old versions(<1.2)
return n.tpath(bucket, "uploads", uploadID, part)
}

Expand Down Expand Up @@ -536,7 +541,8 @@ func (n *jfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
if minio.IsStringEqual(src, dst) {
return n.GetObjectInfo(ctx, srcBucket, srcObject, minio.ObjectOptions{})
}
tmp := n.tpath(dstBucket, "tmp", minio.MustGetUUID())
uuid := minio.MustGetUUID()
tmp := n.tpath(dstBucket, "tmp", uuid[:subDirPrefix], uuid)
f, eno := n.fs.Create(mctx, tmp, 0666, n.gConf.Umask)
if eno == syscall.ENOENT {
_ = n.mkdirAll(ctx, path.Dir(tmp))
Expand Down Expand Up @@ -725,7 +731,8 @@ func (n *jfsObjects) mkdirAll(ctx context.Context, p string) error {
}

func (n *jfsObjects) putObject(ctx context.Context, bucket, object string, r *minio.PutObjReader, opts minio.ObjectOptions, applyObjTaggingFunc func(tmpName string)) (err error) {
tmpname := n.tpath(bucket, "tmp", minio.MustGetUUID())
uuid := minio.MustGetUUID()
tmpname := n.tpath(bucket, "tmp", uuid[:subDirPrefix], uuid)
f, eno := n.fs.Create(mctx, tmpname, 0666, n.gConf.Umask)
if eno == syscall.ENOENT {
_ = n.mkdirAll(ctx, path.Dir(tmpname))
Expand Down Expand Up @@ -880,7 +887,7 @@ func (n *jfsObjects) ListMultipartUploads(ctx context.Context, bucket string, pr
return // no found
}
defer f.Close(mctx)
entries, eno := f.ReaddirPlus(mctx, 0)
parents, eno := f.ReaddirPlus(mctx, 0)
if eno != 0 {
err = jfsToObjectErr(ctx, eno, bucket)
return
Expand All @@ -891,22 +898,38 @@ func (n *jfsObjects) ListMultipartUploads(ctx context.Context, bucket string, pr
lmi.MaxUploads = maxUploads
lmi.Delimiter = delimiter
commPrefixSet := make(map[string]struct{})
for _, e := range entries {
uploadID := string(e.Name)
// todo: parallel
object_, eno := n.fs.GetXattr(mctx, n.upath(bucket, uploadID), uploadKeyName)
for _, p := range parents {
f, eno := n.fs.Open(mctx, n.tpath(bucket, "uploads", string(p.Name)), 0)
if eno != 0 {
logger.Warnf("get object xattr error %s: %s, ignore this item", n.upath(bucket, uploadID), eno)
continue
return
}
defer f.Close(mctx)
entries, eno := f.ReaddirPlus(mctx, 0)
if eno != 0 {
err = jfsToObjectErr(ctx, eno, bucket)
return
}
object := string(object_)
if strings.HasPrefix(object, prefix) {
if keyMarker != "" && object+uploadID > keyMarker+uploadIDMarker || keyMarker == "" {
lmi.Uploads = append(lmi.Uploads, minio.MultipartInfo{
Object: object,
UploadID: uploadID,
Initiated: time.Unix(e.Attr.Atime, int64(e.Attr.Atimensec)),
})

for _, e := range entries {
if len(e.Name) != 36 {
continue // not an uuid
}
uploadID := string(e.Name)
// todo: parallel
object_, eno := n.fs.GetXattr(mctx, n.upath(bucket, uploadID), uploadKeyName)
if eno != 0 {
logger.Warnf("get object xattr error %s: %s, ignore this item", n.upath(bucket, uploadID), eno)
continue
}
object := string(object_)
if strings.HasPrefix(object, prefix) {
if keyMarker != "" && object+uploadID > keyMarker+uploadIDMarker || keyMarker == "" {
lmi.Uploads = append(lmi.Uploads, minio.MultipartInfo{
Object: object,
UploadID: uploadID,
Initiated: time.Unix(e.Attr.Atime, int64(e.Attr.Atimensec)),
})
}
}
}
}
Expand Down Expand Up @@ -1071,6 +1094,10 @@ func (n *jfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object
for _, part := range parts {
p := n.ppath(bucket, uploadID, strconv.Itoa(part.PartNumber))
copied, eno := n.fs.CopyFileRange(mctx, p, 0, tmp, total, 5<<30)
if eno == syscall.ENOENT { // try lookup from old path
p = n.ppathFlat(bucket, uploadID, strconv.Itoa(part.PartNumber))
copied, eno = n.fs.CopyFileRange(mctx, p, 0, tmp, total, 5<<30)
}
if eno != 0 {
err = jfsToObjectErr(ctx, eno, bucket, object, uploadID)
logger.Errorf("merge parts: %s", err)
Expand Down Expand Up @@ -1148,7 +1175,7 @@ func (n *jfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u
}

func (n *jfsObjects) cleanup() {
for t := range time.Tick(24 * time.Hour) {
for range time.Tick(24 * time.Hour) {
// default bucket tmp dirs
tmpDirs := []string{".sys/tmp/", ".sys/uploads/"}
if n.gConf.MultiBucket {
Expand All @@ -1163,27 +1190,40 @@ func (n *jfsObjects) cleanup() {
}
}
for _, dir := range tmpDirs {
f, errno := n.fs.Open(mctx, dir, 0)
if errno != 0 {
n.cleanupDir(dir)
}
}
}

func (n *jfsObjects) cleanupDir(dir string) bool {
f, errno := n.fs.Open(mctx, dir, 0)
if errno != 0 {
return false
}
defer f.Close(mctx)
entries, _ := f.ReaddirPlus(mctx, 0)
now := time.Now()
deleted := 0
for _, entry := range entries {
dirPath := n.path(dir, string(entry.Name))
if entry.Attr.Typ == meta.TypeDirectory && len(entry.Name) == subDirPrefix {
if !n.cleanupDir(strings.TrimPrefix(dirPath, "/")) {
continue
}
entries, _ := f.ReaddirPlus(mctx, 0)
for _, entry := range entries {
if _, err := uuid.Parse(string(entry.Name)); err != nil {
continue
}
if t.Sub(time.Unix(entry.Attr.Mtime, 0)) > 7*24*time.Hour {
p := n.path(dir, string(entry.Name))
if errno := n.fs.Rmr(mctx, p); errno != 0 {
logger.Errorf("failed to delete expired temporary files path: %s,", p)
} else {
logger.Infof("delete expired temporary files path: %s, mtime: %s", p, time.Unix(entry.Attr.Mtime, 0).Format(time.RFC3339))
}
}
} else if _, err := uuid.Parse(string(entry.Name)); err != nil {
logger.Warnf("unexpected file path: %s", dirPath)
continue
}
if now.Sub(time.Unix(entry.Attr.Mtime, 0)) > 7*24*time.Hour {
if errno = n.fs.Rmr(mctx, dirPath); errno != 0 {
logger.Errorf("failed to delete expired temporary files path: %s, err: %s", dirPath, errno)
} else {
deleted += 1
logger.Infof("delete expired temporary files path: %s, mtime: %s", dirPath, time.Unix(entry.Attr.Mtime, 0).Format(time.RFC3339))
}
_ = f.Close(mctx)
}
}
return deleted == len(entries)
}

type jfsFLock struct {
Expand Down
Loading