Skip to content

Commit

Permalink
Slightly split buffer eviction code
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalif committed Oct 13, 2023
1 parent 068c622 commit 7858e1d
Showing 1 changed file with 79 additions and 55 deletions.
134 changes: 79 additions & 55 deletions internal/goofys.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,46 +566,20 @@ func (fs *Goofys) FreeSomeCleanBuffers(origSize int64) (int64, bool) {
if inode == nil {
continue
}
toFs := -1
inode.mu.Lock()
del := -1
i := 0
for ; i < len(inode.buffers); i++ {
toFs := -1
inode.buffers = filterBuffers(inode.buffers, func(buf, prev *FileBuffer) FilterAction {
if freed >= size {
break
return FA_STOP
}
buf := inode.buffers[i]
// Never evict buffers flushed in an incomplete (last) part
if buf.dirtyID == 0 || buf.state == BUF_FLUSHED_FULL {
if buf.ptr != nil && !inode.IsRangeLocked(buf.offset, buf.length, false) &&
// Skip recent buffers when possible
(skipRecent == 0 || buf.recency <= skipRecent) &&
// Do not evict modified header
(buf.state != BUF_FLUSHED_FULL || buf.offset >= fs.flags.PartSizes[0].PartSize) {
if fs.flags.CachePath != "" && !buf.onDisk {
if toFs == -1 {
toFs = 0
if fs.lfru.GetHits(inode.Id) >= fs.flags.CacheToDiskHits {
toFs = 1
}
}
if toFs > 0 {
// Evict to disk
err := inode.OpenCacheFD()
if err != nil {
toFs = 0
} else {
_, err := inode.DiskCacheFD.WriteAt(buf.data, int64(buf.offset))
if err != nil {
toFs = 0
log.Errorf("Couldn't write %v bytes at offset %v to %v: %v",
len(buf.data), buf.offset, fs.flags.CachePath+"/"+inode.FullName(), err)
} else {
buf.onDisk = true
}
}
}
}
fs.tryEvictToDisk(inode, buf, &toFs)
// Release memory
buf.ptr.refs--
if buf.ptr.refs == 0 {
Expand All @@ -615,27 +589,17 @@ func (fs *Goofys) FreeSomeCleanBuffers(origSize int64) (int64, bool) {
buf.ptr = nil
buf.data = nil
if buf.dirtyID == 0 && !buf.onDisk {
if del == -1 {
del = i
}
continue
return FA_DEL
} else if buf.state == BUF_FLUSHED_FULL {
// A flushed buffer can be removed at a cost of finalizing multipart upload
// to read it back later. However it's likely not a problem if we're uploading
// a large file because we may never need to read it back.
// One exception is that we don't do it with the header because various
// software commonly modifies header after writing the whole large file
prev := del-1
if prev < 0 {
prev = i-1
}
if prev >= 0 && inode.buffers[prev].state == BUF_FL_CLEARED &&
buf.offset == (inode.buffers[prev].offset + inode.buffers[prev].length) {
inode.buffers[prev].length += buf.length
if del == -1 {
del = i
}
continue
if prev != nil && prev.state == BUF_FL_CLEARED &&
buf.offset == (prev.offset + prev.length) {
prev.length += buf.length
return FA_DEL
} else {
buf.state = BUF_FL_CLEARED
}
Expand All @@ -644,16 +608,8 @@ func (fs *Goofys) FreeSomeCleanBuffers(origSize int64) (int64, bool) {
} else if inode.fs.partNum(buf.offset+buf.length-1) != inode.fs.partNum(inode.lastWriteEnd) {
haveDirty = true
}
if del >= 0 {
inode.buffers = append(inode.buffers[0 : del], inode.buffers[i : ]...)
i = del
del = -1
}
}
if del >= 0 {
inode.buffers = append(inode.buffers[0 : del], inode.buffers[i : ]...)
del = -1
}
return FA_LEAVE
})
inode.mu.Unlock()
if freed >= size {
break
Expand All @@ -669,6 +625,74 @@ func (fs *Goofys) FreeSomeCleanBuffers(origSize int64) (int64, bool) {
return freed, haveDirty
}

func (fs *Goofys) tryEvictToDisk(inode *Inode, buf *FileBuffer, toFs *int) {
if fs.flags.CachePath != "" && !buf.onDisk {
if *toFs == -1 {
*toFs = 0
if fs.lfru.GetHits(inode.Id) >= fs.flags.CacheToDiskHits {
*toFs = 1
}
}
if *toFs > 0 {
// Evict to disk
err := inode.OpenCacheFD()
if err != nil {
*toFs = 0
} else {
_, err := inode.DiskCacheFD.WriteAt(buf.data, int64(buf.offset))
if err != nil {
*toFs = 0
log.Errorf("Couldn't write %v bytes at offset %v to %v: %v",
len(buf.data), buf.offset, fs.flags.CachePath+"/"+inode.FullName(), err)
} else {
buf.onDisk = true
}
}
}
}
}

type FilterAction uint8

const (
FA_LEAVE FilterAction = 1
FA_DEL FilterAction = 2
FA_STOP FilterAction = 3
)

func filterBuffers(buffers []*FileBuffer, cb func(buf, prev *FileBuffer) FilterAction) []*FileBuffer {
del := -1
i := 0
var prev *FileBuffer
for ; i < len(buffers); i++ {
buf := buffers[i]
action := cb(buf, prev)
if action == FA_DEL {
if del == -1 {
if i > 0 {
prev = buffers[i-1]
}
del = i
}
} else {
if del >= 0 {
buffers = append(buffers[0 : del], buffers[i : ]...)
i = del
del = -1
}
prev = buf
}
if action == FA_STOP {
break
}
}
if del >= 0 {
buffers = append(buffers[0 : del], buffers[i : ]...)
del = -1
}
return buffers
}

func (fs *Goofys) WakeupFlusherAndWait(wait bool) {
fs.flusherMu.Lock()
if fs.flushPending == 0 {
Expand Down

0 comments on commit 7858e1d

Please sign in to comment.