Skip to content

Commit

Permalink
Merge pull request #113 from bttrfl/patch-only-uploads
Browse files Browse the repository at this point in the history
"Prefer patch uploads" option
  • Loading branch information
vitalif authored May 23, 2024
2 parents af937c4 + 97469d3 commit 31f0562
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 20 deletions.
1 change: 1 addition & 0 deletions internal/cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type FlagStorage struct {
PartSizes []PartSizeConfig
UsePatch bool
DropPatchConflicts bool
PreferPatchUploads bool

// Debugging
DebugMain bool
Expand Down
10 changes: 9 additions & 1 deletion internal/cfg/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,14 +433,21 @@ MISC OPTIONS:

cli.BoolFlag{
Name: "enable-patch",
Usage: "Use PATCH method to upload object data changes to s3. Yandex only. (default: off)",
Usage: "Use PATCH method to upload object data changes to S3. All PATCH related flags are Yandex only. (default: off)",
},

cli.BoolFlag{
Name: "drop-patch-conflicts",
Usage: "Drop local changes in case of conflicting concurrent PATCH updates. (default: off)",
},

cli.BoolFlag{
Name: "prefer-patch-uploads",
Usage: "When uploading new objects, prefer PATCH requests to standard multipart upload process." +
"This allows for changes to appear faster in exchange for slower upload speed due to limited parallelism." +
"Must be used with --enable-patch flag (default: off)",
},

cli.IntFlag{
Name: "max-merge-copy",
Value: 0,
Expand Down Expand Up @@ -860,6 +867,7 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) {
CacheFileMode: os.FileMode(c.Int("cache-file-mode")),
UsePatch: c.Bool("enable-patch"),
DropPatchConflicts: c.Bool("drop-patch-conflicts"),
PreferPatchUploads: c.Bool("prefer-patch-uploads"),

// Common Backend Config
Endpoint: c.String("endpoint"),
Expand Down
107 changes: 88 additions & 19 deletions internal/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,11 @@ func (inode *Inode) sendUpload(priority int) bool {
if inode.IsFlushing > 0 {
return false
}
inode.sendStartMultipart()
if inode.fs.flags.UsePatch && inode.fs.flags.PreferPatchUploads {
inode.uploadMinMultipart()
} else {
inode.sendStartMultipart()
}
return true
}

Expand Down Expand Up @@ -954,6 +958,15 @@ func (inode *Inode) sendStartMultipart() {
atomic.AddInt64(&inode.fs.stats.flushes, 1)
atomic.AddInt64(&inode.fs.activeFlushers, 1)
go func() {
inode.beginMultipartUpload(cloud, key)
inode.IsFlushing -= inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, -1)
inode.fs.WakeupFlusher()
inode.mu.Unlock()
}()
}

func (inode *Inode) beginMultipartUpload(cloud StorageBackend, key string) {
params := &MultipartBlobBeginInput{
Key: key,
ContentType: inode.fs.flags.GetMimeType(key),
Expand All @@ -973,11 +986,6 @@ func (inode *Inode) sendStartMultipart() {
log.Debugf("Started multi-part upload of object %v", key)
inode.mpu = resp
}
inode.IsFlushing -= inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, -1)
inode.fs.WakeupFlusher()
inode.mu.Unlock()
}()
}

func (inode *Inode) sendUploadParts(priority int) (bool, bool) {
Expand Down Expand Up @@ -1090,6 +1098,61 @@ func (inode *Inode) uploadedAsMultipart() bool {
return strings.Contains(inode.knownETag, "-")
}

func (inode *Inode) uploadMinMultipart() {
inode.IsFlushing += inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, 1)

cloud, key := inode.cloud()
if inode.isDir() {
key += "/"
}

go func() {
defer func() {
inode.IsFlushing -= inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, -1)
inode.fs.WakeupFlusher()
inode.mu.Unlock()
}()

atomic.AddInt64(&inode.fs.stats.flushes, 1)
inode.beginMultipartUpload(cloud, key)
if inode.mpu == nil {
return
}

if ok := inode.syncFlushPartsUpTo(2); !ok {
inode.abortMultipart()
return
}

partOffset, partSize := inode.fs.partRange(1)
if inode.Attributes.Size < partOffset+partSize {
partSize = inode.Attributes.Size - partOffset
}

atomic.AddInt64(&inode.fs.stats.flushes, 1)
inode.commitMultipartUpload(2, partOffset+partSize)
if inode.mpu != nil {
inode.abortMultipart()
}
}()
}

func (inode *Inode) syncFlushPartsUpTo(part uint64) bool {
if inode.mpu == nil {
return false
}
for i := uint64(0); i < part; i++ {
atomic.AddInt64(&inode.fs.stats.flushes, 1)
inode.flushPart(i)
if inode.mpu == nil || inode.mpu.Parts[i] == nil {
return false
}
}
return true
}

func (inode *Inode) patchObjectRanges() (initiated bool) {
smallFile := inode.Attributes.Size <= inode.fs.flags.SinglePartMB*1024*1024
wantFlush := inode.fileHandles == 0 || inode.forceFlush || atomic.LoadInt32(&inode.fs.wantFree) > 0
Expand All @@ -1116,7 +1179,11 @@ func (inode *Inode) patchObjectRanges() (initiated bool) {
return false
}

_, prevSize := inode.fs.partRange(MaxUInt64(part-1, 0))
var prevPart uint64
if part > 0 {
prevPart = part - 1
}
_, prevSize := inode.fs.partRange(prevPart)

partEnd, rangeBorder := partStart+partSize, partSize != prevSize
appendPatch, newPart := partEnd > inode.knownSize, partStart == inode.knownSize
Expand Down Expand Up @@ -1345,6 +1412,15 @@ func (inode *Inode) resetCache() {
}
// And abort multipart upload, too
if inode.mpu != nil {
inode.abortMultipart()
}
inode.userMetadataDirty = 0
inode.SetCacheState(ST_CACHED)
// Invalidate metadata entry
inode.SetAttrTime(time.Time{})
}

func (inode *Inode) abortMultipart() {
cloud, key := inode.cloud()
go func(mpu *MultipartBlobCommitInput) {
_, abortErr := cloud.MultipartBlobAbort(mpu)
Expand All @@ -1354,11 +1430,6 @@ func (inode *Inode) resetCache() {
}(inode.mpu)
inode.mpu = nil
}
inode.userMetadataDirty = 0
inode.SetCacheState(ST_CACHED)
// Invalidate metadata entry
inode.SetAttrTime(time.Time{})
}

func (inode *Inode) flushSmallObject() {

Expand Down Expand Up @@ -1422,13 +1493,7 @@ func (inode *Inode) flushSmallObject() {
if inode.mpu != nil {
// Abort and forget abort multipart upload, because otherwise we may
// not be able to proceed to rename - it waits until inode.mpu == nil
go func(mpu *MultipartBlobCommitInput) {
_, abortErr := cloud.MultipartBlobAbort(mpu)
if abortErr != nil {
log.Warnf("Failed to abort multi-part upload of object %v: %v", key, abortErr)
}
}(inode.mpu)
inode.mpu = nil
inode.abortMultipart()
}
inode.mu.Unlock()
inode.fs.addInflightChange(key)
Expand Down Expand Up @@ -1677,6 +1742,10 @@ func (inode *Inode) completeMultipart() {
// Error, or already flushed, or conflict => do not complete
return
}
inode.commitMultipartUpload(numParts, finalSize)
}

func (inode *Inode) commitMultipartUpload(numParts, finalSize uint64) {
cloud, key := inode.cloud()
if inode.oldParent != nil {
// Always apply modifications before moving
Expand Down

0 comments on commit 31f0562

Please sign in to comment.