diff --git a/internal/cfg/config.go b/internal/cfg/config.go index 6db7b53..501c29f 100644 --- a/internal/cfg/config.go +++ b/internal/cfg/config.go @@ -103,6 +103,7 @@ type FlagStorage struct { PartSizes []PartSizeConfig UsePatch bool DropPatchConflicts bool + PreferPatchUploads bool // Debugging DebugMain bool diff --git a/internal/cfg/flags.go b/internal/cfg/flags.go index d6f101c..7350b32 100644 --- a/internal/cfg/flags.go +++ b/internal/cfg/flags.go @@ -433,7 +433,7 @@ MISC OPTIONS: cli.BoolFlag{ Name: "enable-patch", - Usage: "Use PATCH method to upload object data changes to s3. Yandex only. (default: off)", + Usage: "Use PATCH method to upload object data changes to S3. All PATCH related flags are Yandex only. (default: off)", }, cli.BoolFlag{ @@ -441,6 +441,13 @@ MISC OPTIONS: Usage: "Drop local changes in case of conflicting concurrent PATCH updates. (default: off)", }, + cli.BoolFlag{ + Name: "prefer-patch-uploads", + Usage: "When uploading new objects, prefer PATCH requests to standard multipart upload process." + + "This allows for changes to appear faster in exchange for slower upload speed due to limited parallelism." + + "Must be used with --enable-patch flag (default: off)", + }, + cli.IntFlag{ Name: "max-merge-copy", Value: 0, @@ -860,6 +867,7 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) { CacheFileMode: os.FileMode(c.Int("cache-file-mode")), UsePatch: c.Bool("enable-patch"), DropPatchConflicts: c.Bool("drop-patch-conflicts"), + PreferPatchUploads: c.Bool("prefer-patch-uploads"), // Common Backend Config Endpoint: c.String("endpoint"), diff --git a/internal/file.go b/internal/file.go index b52995d..7bed463 100644 --- a/internal/file.go +++ b/internal/file.go @@ -716,7 +716,11 @@ func (inode *Inode) sendUpload(priority int) bool { if inode.IsFlushing > 0 { return false } - inode.sendStartMultipart() + if inode.fs.flags.UsePatch && inode.fs.flags.PreferPatchUploads { + inode.uploadMinMultipart() + } else { + inode.sendStartMultipart() + } return true } @@ -954,6 +958,15 @@ func (inode *Inode) sendStartMultipart() { atomic.AddInt64(&inode.fs.stats.flushes, 1) atomic.AddInt64(&inode.fs.activeFlushers, 1) go func() { + inode.beginMultipartUpload(cloud, key) + inode.IsFlushing -= inode.fs.flags.MaxParallelParts + atomic.AddInt64(&inode.fs.activeFlushers, -1) + inode.fs.WakeupFlusher() + inode.mu.Unlock() + }() +} + +func (inode *Inode) beginMultipartUpload(cloud StorageBackend, key string) { params := &MultipartBlobBeginInput{ Key: key, ContentType: inode.fs.flags.GetMimeType(key), @@ -973,11 +986,6 @@ func (inode *Inode) sendStartMultipart() { log.Debugf("Started multi-part upload of object %v", key) inode.mpu = resp } - inode.IsFlushing -= inode.fs.flags.MaxParallelParts - atomic.AddInt64(&inode.fs.activeFlushers, -1) - inode.fs.WakeupFlusher() - inode.mu.Unlock() - }() } func (inode *Inode) sendUploadParts(priority int) (bool, bool) { @@ -1090,6 +1098,61 @@ func (inode *Inode) uploadedAsMultipart() bool { return strings.Contains(inode.knownETag, "-") } +func (inode *Inode) uploadMinMultipart() { + inode.IsFlushing += inode.fs.flags.MaxParallelParts + atomic.AddInt64(&inode.fs.activeFlushers, 1) + + cloud, key := inode.cloud() + if inode.isDir() { + key += "/" + } + + go func() { + defer func() { + inode.IsFlushing -= inode.fs.flags.MaxParallelParts + atomic.AddInt64(&inode.fs.activeFlushers, -1) + inode.fs.WakeupFlusher() + inode.mu.Unlock() + }() + + atomic.AddInt64(&inode.fs.stats.flushes, 1) + inode.beginMultipartUpload(cloud, key) + if inode.mpu == nil { + return + } + + if ok := inode.syncFlushPartsUpTo(2); !ok { + inode.abortMultipart() + return + } + + partOffset, partSize := inode.fs.partRange(1) + if inode.Attributes.Size < partOffset+partSize { + partSize = inode.Attributes.Size - partOffset + } + + atomic.AddInt64(&inode.fs.stats.flushes, 1) + inode.commitMultipartUpload(2, partOffset+partSize) + if inode.mpu != nil { + inode.abortMultipart() + } + }() +} + +func (inode *Inode) syncFlushPartsUpTo(part uint64) bool { + if inode.mpu == nil { + return false + } + for i := uint64(0); i < part; i++ { + atomic.AddInt64(&inode.fs.stats.flushes, 1) + inode.flushPart(i) + if inode.mpu == nil || inode.mpu.Parts[i] == nil { + return false + } + } + return true +} + func (inode *Inode) patchObjectRanges() (initiated bool) { smallFile := inode.Attributes.Size <= inode.fs.flags.SinglePartMB*1024*1024 wantFlush := inode.fileHandles == 0 || inode.forceFlush || atomic.LoadInt32(&inode.fs.wantFree) > 0 @@ -1116,7 +1179,11 @@ func (inode *Inode) patchObjectRanges() (initiated bool) { return false } - _, prevSize := inode.fs.partRange(MaxUInt64(part-1, 0)) + var prevPart uint64 + if part > 0 { + prevPart = part - 1 + } + _, prevSize := inode.fs.partRange(prevPart) partEnd, rangeBorder := partStart+partSize, partSize != prevSize appendPatch, newPart := partEnd > inode.knownSize, partStart == inode.knownSize @@ -1345,6 +1412,15 @@ func (inode *Inode) resetCache() { } // And abort multipart upload, too if inode.mpu != nil { + inode.abortMultipart() + } + inode.userMetadataDirty = 0 + inode.SetCacheState(ST_CACHED) + // Invalidate metadata entry + inode.SetAttrTime(time.Time{}) +} + +func (inode *Inode) abortMultipart() { cloud, key := inode.cloud() go func(mpu *MultipartBlobCommitInput) { _, abortErr := cloud.MultipartBlobAbort(mpu) @@ -1354,11 +1430,6 @@ func (inode *Inode) resetCache() { }(inode.mpu) inode.mpu = nil } - inode.userMetadataDirty = 0 - inode.SetCacheState(ST_CACHED) - // Invalidate metadata entry - inode.SetAttrTime(time.Time{}) -} func (inode *Inode) flushSmallObject() { @@ -1422,13 +1493,7 @@ func (inode *Inode) flushSmallObject() { if inode.mpu != nil { // Abort and forget abort multipart upload, because otherwise we may // not be able to proceed to rename - it waits until inode.mpu == nil - go func(mpu *MultipartBlobCommitInput) { - _, abortErr := cloud.MultipartBlobAbort(mpu) - if abortErr != nil { - log.Warnf("Failed to abort multi-part upload of object %v: %v", key, abortErr) - } - }(inode.mpu) - inode.mpu = nil + inode.abortMultipart() } inode.mu.Unlock() inode.fs.addInflightChange(key) @@ -1677,6 +1742,10 @@ func (inode *Inode) completeMultipart() { // Error, or already flushed, or conflict => do not complete return } + inode.commitMultipartUpload(numParts, finalSize) +} + +func (inode *Inode) commitMultipartUpload(numParts, finalSize uint64) { cloud, key := inode.cloud() if inode.oldParent != nil { // Always apply modifications before moving