diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index f5363ba957d8..105b63cedd18 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -48,8 +48,9 @@ import ( ) const ( - sep = "/" - metaBucket = ".sys" + sep = "/" + metaBucket = ".sys" + subDirPrefix = 3 // 16^3=4096 slots ) var mctx meta.Context @@ -186,10 +187,14 @@ func (n *jfsObjects) tpath(p ...string) string { } func (n *jfsObjects) upath(bucket, uploadID string) string { - return n.tpath(bucket, "uploads", uploadID) + return n.tpath(bucket, "uploads", uploadID[:subDirPrefix], uploadID) } func (n *jfsObjects) ppath(bucket, uploadID, part string) string { + return n.tpath(bucket, "uploads", uploadID[:subDirPrefix], uploadID, part) +} + +func (n *jfsObjects) ppathFlat(bucket, uploadID, part string) string { // compatible with tmp files uploaded by old versions(<1.2) return n.tpath(bucket, "uploads", uploadID, part) } @@ -536,7 +541,8 @@ func (n *jfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu if minio.IsStringEqual(src, dst) { return n.GetObjectInfo(ctx, srcBucket, srcObject, minio.ObjectOptions{}) } - tmp := n.tpath(dstBucket, "tmp", minio.MustGetUUID()) + uuid := minio.MustGetUUID() + tmp := n.tpath(dstBucket, "tmp", uuid[:subDirPrefix], uuid) f, eno := n.fs.Create(mctx, tmp, 0666, n.gConf.Umask) if eno == syscall.ENOENT { _ = n.mkdirAll(ctx, path.Dir(tmp)) @@ -725,7 +731,8 @@ func (n *jfsObjects) mkdirAll(ctx context.Context, p string) error { } func (n *jfsObjects) putObject(ctx context.Context, bucket, object string, r *minio.PutObjReader, opts minio.ObjectOptions, applyObjTaggingFunc func(tmpName string)) (err error) { - tmpname := n.tpath(bucket, "tmp", minio.MustGetUUID()) + uuid := minio.MustGetUUID() + tmpname := n.tpath(bucket, "tmp", uuid[:subDirPrefix], uuid) f, eno := n.fs.Create(mctx, tmpname, 0666, n.gConf.Umask) if eno == syscall.ENOENT { _ = n.mkdirAll(ctx, path.Dir(tmpname)) @@ -880,7 +887,7 @@ func (n *jfsObjects) ListMultipartUploads(ctx context.Context, bucket string, pr return // no found } defer f.Close(mctx) - entries, eno := f.ReaddirPlus(mctx, 0) + parents, eno := f.ReaddirPlus(mctx, 0) if eno != 0 { err = jfsToObjectErr(ctx, eno, bucket) return @@ -891,22 +898,38 @@ func (n *jfsObjects) ListMultipartUploads(ctx context.Context, bucket string, pr lmi.MaxUploads = maxUploads lmi.Delimiter = delimiter commPrefixSet := make(map[string]struct{}) - for _, e := range entries { - uploadID := string(e.Name) - // todo: parallel - object_, eno := n.fs.GetXattr(mctx, n.upath(bucket, uploadID), uploadKeyName) + for _, p := range parents { + f, eno := n.fs.Open(mctx, n.tpath(bucket, "uploads", string(p.Name)), 0) if eno != 0 { - logger.Warnf("get object xattr error %s: %s, ignore this item", n.upath(bucket, uploadID), eno) - continue + return + } + defer f.Close(mctx) + entries, eno := f.ReaddirPlus(mctx, 0) + if eno != 0 { + err = jfsToObjectErr(ctx, eno, bucket) + return } - object := string(object_) - if strings.HasPrefix(object, prefix) { - if keyMarker != "" && object+uploadID > keyMarker+uploadIDMarker || keyMarker == "" { - lmi.Uploads = append(lmi.Uploads, minio.MultipartInfo{ - Object: object, - UploadID: uploadID, - Initiated: time.Unix(e.Attr.Atime, int64(e.Attr.Atimensec)), - }) + + for _, e := range entries { + if len(e.Name) != 36 { + continue // not an uuid + } + uploadID := string(e.Name) + // todo: parallel + object_, eno := n.fs.GetXattr(mctx, n.upath(bucket, uploadID), uploadKeyName) + if eno != 0 { + logger.Warnf("get object xattr error %s: %s, ignore this item", n.upath(bucket, uploadID), eno) + continue + } + object := string(object_) + if strings.HasPrefix(object, prefix) { + if keyMarker != "" && object+uploadID > keyMarker+uploadIDMarker || keyMarker == "" { + lmi.Uploads = append(lmi.Uploads, minio.MultipartInfo{ + Object: object, + UploadID: uploadID, + Initiated: time.Unix(e.Attr.Atime, int64(e.Attr.Atimensec)), + }) + } } } } @@ -1071,6 +1094,10 @@ func (n *jfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object for _, part := range parts { p := n.ppath(bucket, uploadID, strconv.Itoa(part.PartNumber)) copied, eno := n.fs.CopyFileRange(mctx, p, 0, tmp, total, 5<<30) + if eno == syscall.ENOENT { // try lookup from old path + p = n.ppathFlat(bucket, uploadID, strconv.Itoa(part.PartNumber)) + copied, eno = n.fs.CopyFileRange(mctx, p, 0, tmp, total, 5<<30) + } if eno != 0 { err = jfsToObjectErr(ctx, eno, bucket, object, uploadID) logger.Errorf("merge parts: %s", err) @@ -1148,7 +1175,7 @@ func (n *jfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u } func (n *jfsObjects) cleanup() { - for t := range time.Tick(24 * time.Hour) { + for range time.Tick(24 * time.Hour) { // default bucket tmp dirs tmpDirs := []string{".sys/tmp/", ".sys/uploads/"} if n.gConf.MultiBucket { @@ -1163,27 +1190,40 @@ func (n *jfsObjects) cleanup() { } } for _, dir := range tmpDirs { - f, errno := n.fs.Open(mctx, dir, 0) - if errno != 0 { + n.cleanupDir(dir) + } + } +} + +func (n *jfsObjects) cleanupDir(dir string) bool { + f, errno := n.fs.Open(mctx, dir, 0) + if errno != 0 { + return false + } + defer f.Close(mctx) + entries, _ := f.ReaddirPlus(mctx, 0) + now := time.Now() + deleted := 0 + for _, entry := range entries { + dirPath := n.path(dir, string(entry.Name)) + if entry.Attr.Typ == meta.TypeDirectory && len(entry.Name) == subDirPrefix { + if !n.cleanupDir(strings.TrimPrefix(dirPath, "/")) { continue } - entries, _ := f.ReaddirPlus(mctx, 0) - for _, entry := range entries { - if _, err := uuid.Parse(string(entry.Name)); err != nil { - continue - } - if t.Sub(time.Unix(entry.Attr.Mtime, 0)) > 7*24*time.Hour { - p := n.path(dir, string(entry.Name)) - if errno := n.fs.Rmr(mctx, p); errno != 0 { - logger.Errorf("failed to delete expired temporary files path: %s,", p) - } else { - logger.Infof("delete expired temporary files path: %s, mtime: %s", p, time.Unix(entry.Attr.Mtime, 0).Format(time.RFC3339)) - } - } + } else if _, err := uuid.Parse(string(entry.Name)); err != nil { + logger.Warnf("unexpected file path: %s", dirPath) + continue + } + if now.Sub(time.Unix(entry.Attr.Mtime, 0)) > 7*24*time.Hour { + if errno = n.fs.Rmr(mctx, dirPath); errno != 0 { + logger.Errorf("failed to delete expired temporary files path: %s, err: %s", dirPath, errno) + } else { + deleted += 1 + logger.Infof("delete expired temporary files path: %s, mtime: %s", dirPath, time.Unix(entry.Attr.Mtime, 0).Format(time.RFC3339)) } - _ = f.Close(mctx) } } + return deleted == len(entries) } type jfsFLock struct {