Skip to content

Commit

Permalink
gateway: fix etag and set xattr before complete multipart rename to e…
Browse files Browse the repository at this point in the history
…nsure consistency (#5258)
  • Loading branch information
zhijian-pro authored Oct 31, 2024
1 parent 30ab60e commit 6493c01
Showing 1 changed file with 79 additions and 65 deletions.
144 changes: 79 additions & 65 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,27 @@ func (n *jfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
logger.Errorf("copy %s to %s: %s", src, tmp, err)
return
}

var etag []byte
if n.gConf.KeepEtag {
etag, _ = n.fs.GetXattr(mctx, src, s3Etag)
if len(etag) != 0 {
eno = n.fs.SetXattr(mctx, tmp, s3Etag, etag, 0)
if eno != 0 {
logger.Warnf("set xattr error, path: %s,xattr: %s,value: %s,flags: %d", tmp, s3Etag, etag, 0)
}
}
}

var tagStr string
if n.gConf.ObjTag && srcInfo.UserDefined != nil {
if tagStr = srcInfo.UserDefined[xhttp.AmzObjectTagging]; tagStr != "" {
if eno := n.fs.SetXattr(mctx, tmp, s3Tags, []byte(tagStr), 0); eno != 0 {
logger.Errorf("set object tags error, path: %s,value: %s error %s", tmp, tagStr, eno)
}
}
}

eno = n.fs.Rename(mctx, tmp, dst, 0)
if eno == syscall.ENOENT {
if err = n.mkdirAll(ctx, path.Dir(dst)); err != nil {
Expand All @@ -577,26 +598,6 @@ func (n *jfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
return
}

var etag []byte
if n.gConf.KeepEtag {
etag, _ = n.fs.GetXattr(mctx, src, s3Etag)
if len(etag) != 0 {
eno = n.fs.SetXattr(mctx, dst, s3Etag, etag, 0)
if eno != 0 {
logger.Warnf("set xattr error, path: %s,xattr: %s,value: %s,flags: %d", dst, s3Etag, etag, 0)
}
}
}

var tagStr string
if n.gConf.ObjTag && srcInfo.UserDefined != nil {
if tagStr = srcInfo.UserDefined[xhttp.AmzObjectTagging]; tagStr != "" {
if eno := n.fs.SetXattr(mctx, dst, s3Tags, []byte(tagStr), 0); eno != 0 {
logger.Errorf("set object tags error, path: %s,value: %s error %s", dst, tagStr, eno)
}
}
}

return minio.ObjectInfo{
Bucket: dstBucket,
Name: dstObject,
Expand Down Expand Up @@ -725,7 +726,7 @@ func (n *jfsObjects) mkdirAll(ctx context.Context, p string) error {
return eno
}

func (n *jfsObjects) putObject(ctx context.Context, bucket, object string, r *minio.PutObjReader, opts minio.ObjectOptions) (err error) {
func (n *jfsObjects) putObject(ctx context.Context, bucket, object string, r *minio.PutObjReader, opts minio.ObjectOptions, applyObjTaggingFunc func(tmpName string)) (err error) {
tmpname := n.tpath(bucket, "tmp", minio.MustGetUUID())
f, eno := n.fs.Create(mctx, tmpname, 0666, n.gConf.Umask)
if eno == syscall.ENOENT {
Expand Down Expand Up @@ -766,6 +767,9 @@ func (n *jfsObjects) putObject(ctx context.Context, bucket, object string, r *mi
if err != nil {
return
}

applyObjTaggingFunc(tmpname)

eno = n.fs.Rename(mctx, tmpname, object, 0)
if eno == syscall.ENOENT {
if err = n.mkdirAll(ctx, path.Dir(object)); err != nil {
Expand All @@ -785,7 +789,8 @@ func (n *jfsObjects) PutObject(ctx context.Context, bucket string, object string
if err = n.checkBucket(ctx, bucket); err != nil {
return
}

var tagStr string
var etag string
p := n.path(bucket, object)
if strings.HasSuffix(object, sep) {
if err = n.mkdirAll(ctx, p); err != nil {
Expand All @@ -802,30 +807,30 @@ func (n *jfsObjects) PutObject(ctx context.Context, bucket string, object string
}
// if the put object is a directory, set its atime to 0
n.setFileAtime(p, 0)
} else if err = n.putObject(ctx, bucket, p, r, opts); err != nil {
return
} else {
if err = n.putObject(ctx, bucket, p, r, opts, func(tmpName string) {
etag = r.MD5CurrentHexString()
if n.gConf.KeepEtag && !strings.HasSuffix(object, sep) {
if eno := n.fs.SetXattr(mctx, tmpName, s3Etag, []byte(etag), 0); eno != 0 {
logger.Errorf("set xattr error, path: %s,xattr: %s,value: %s,flags: %d", tmpName, s3Etag, etag, 0)
}
}
// tags: key1=value1&key2=value2&key3=value3
if n.gConf.ObjTag && opts.UserDefined != nil {
if tagStr = opts.UserDefined[xhttp.AmzObjectTagging]; tagStr != "" {
if eno := n.fs.SetXattr(mctx, tmpName, s3Tags, []byte(tagStr), 0); eno != 0 {
logger.Errorf("set object tags error, path: %s,value: %s error: %s", tmpName, tagStr, eno)
}
}
}
}); err != nil {
return
}
}
fi, eno := n.fs.Stat(mctx, p)
if eno != 0 {
return objInfo, jfsToObjectErr(ctx, eno, bucket, object)
}
etag := r.MD5CurrentHexString()
if n.gConf.KeepEtag && !strings.HasSuffix(object, sep) {
eno = n.fs.SetXattr(mctx, p, s3Etag, []byte(etag), 0)
if eno != 0 {
logger.Errorf("set xattr error, path: %s,xattr: %s,value: %s,flags: %d", p, s3Etag, etag, 0)
}
}
// tags: key1=value1&key2=value2&key3=value3
var tagStr string
if n.gConf.ObjTag && opts.UserDefined != nil {
if tagStr = opts.UserDefined[xhttp.AmzObjectTagging]; tagStr != "" {
if eno := n.fs.SetXattr(mctx, p, s3Tags, []byte(tagStr), 0); eno != 0 {
logger.Errorf("set object tags error, path: %s,value: %s error: %s", p, tagStr, eno)
}
}
}

return minio.ObjectInfo{
Bucket: bucket,
Name: object,
Expand Down Expand Up @@ -1021,14 +1026,16 @@ func (n *jfsObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
return
}
p := n.ppath(bucket, uploadID, strconv.Itoa(partID))
if err = n.putObject(ctx, bucket, p, r, opts); err != nil {
var etag string
if err = n.putObject(ctx, bucket, p, r, opts, func(tmpName string) {
etag = r.MD5CurrentHexString()
if n.fs.SetXattr(mctx, tmpName, s3Etag, []byte(etag), 0) != 0 {
logger.Warnf("set xattr error, path: %s,xattr: %s,value: %s,flags: %d", tmpName, s3Etag, etag, 0)
}
}); err != nil {
err = jfsToObjectErr(ctx, err, bucket, object)
return
}
etag := r.MD5CurrentHexString()
if n.fs.SetXattr(mctx, p, s3Etag, []byte(etag), 0) != 0 {
logger.Warnf("set xattr error, path: %s,xattr: %s,value: %s,flags: %d", p, s3Etag, etag, 0)
}
info.PartNumber = partID
info.ETag = etag
info.LastModified = minio.UTCNow()
Expand Down Expand Up @@ -1074,6 +1081,32 @@ func (n *jfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object
total += copied
}

// Calculate s3 compatible md5sum for complete multipart.
s3MD5 := minio.ComputeCompleteMultipartMD5(parts)
if n.gConf.KeepEtag {
eno = n.fs.SetXattr(mctx, tmp, s3Etag, []byte(s3MD5), 0)
if eno != 0 {
logger.Warnf("set xattr error, path: %s,xattr: %s,value: %s,flags: %d", tmp, s3Etag, s3MD5, 0)
}
}

var tagStr []byte
if n.gConf.ObjTag {
var eno syscall.Errno
if tagStr, eno = n.fs.GetXattr(mctx, n.upath(bucket, uploadID), s3Tags); eno != 0 && eno != meta.ENOATTR {
logger.Errorf("get object tags error, path: %s, error: %s", n.upath(bucket, uploadID), eno)
} else if eno = n.fs.SetXattr(mctx, tmp, s3Tags, tagStr, 0); eno != 0 {
logger.Errorf("set object tags error, path: %s, tags: %s, error: %s", tmp, string(tagStr), eno)
}
if tagStr, eno = n.fs.GetXattr(mctx, n.upath(bucket, uploadID), s3Tags); eno != 0 {
if eno != meta.ENOATTR {
logger.Errorf("get object tags error, path: %s, error: %s", n.upath(bucket, uploadID), eno)
}
} else if eno = n.fs.SetXattr(mctx, tmp, s3Tags, tagStr, 0); eno != 0 {
logger.Errorf("set object tags error, path: %s, tags: %s, error: %s", tmp, string(tagStr), eno)
}
}

name := n.path(bucket, object)
eno = n.fs.Rename(mctx, tmp, name, 0)
if eno == syscall.ENOENT {
Expand All @@ -1099,25 +1132,6 @@ func (n *jfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object
return
}

// Calculate s3 compatible md5sum for complete multipart.
s3MD5 := minio.ComputeCompleteMultipartMD5(parts)
if n.gConf.KeepEtag {
eno = n.fs.SetXattr(mctx, name, s3Etag, []byte(s3MD5), 0)
if eno != 0 {
logger.Warnf("set xattr error, path: %s,xattr: %s,value: %s,flags: %d", name, s3Etag, s3MD5, 0)
}
}

var tagStr []byte
if n.gConf.ObjTag {
var eno syscall.Errno
if tagStr, eno = n.fs.GetXattr(mctx, n.upath(bucket, uploadID), s3Tags); eno != 0 && eno != meta.ENOATTR {
logger.Errorf("get object tags error, path: %s, error: %s", n.upath(bucket, uploadID), eno)
} else if eno = n.fs.SetXattr(mctx, name, s3Tags, tagStr, 0); eno != 0 {
logger.Errorf("set object tags error, path: %s, tags: %s, error: %s", name, string(tagStr), eno)
}
}

// remove parts
_ = n.fs.Rmr(mctx, n.upath(bucket, uploadID))
return minio.ObjectInfo{
Expand Down

0 comments on commit 6493c01

Please sign in to comment.