Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update open file handles in RenameFile #404

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 85 additions & 11 deletions component/file_cache/file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package file_cache

import (
"context"
"errors"
"fmt"
"io"
"io/fs"
Expand Down Expand Up @@ -635,7 +636,24 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error {
newPath := strings.Replace(path, localSrcPath, localDstPath, 1)
if !d.IsDir() {
log.Debug("FileCache::RenameDir : Renaming local file %s -> %s", path, newPath)
fc.renameCachedFile(path, newPath)
// get locks
sflock := fc.fileLocks.Get(fc.getObjectName(path))
dflock := fc.fileLocks.Get(fc.getObjectName(newPath))
if path < newPath {
sflock.Lock()
dflock.Lock()
} else {
dflock.Lock()
sflock.Lock()
}
// complete local rename
err := fc.renameCachedFile(path, newPath, sflock, dflock)
if err != nil {
// there's really not much we can do to handle the error, so just log it
log.Err("FileCache::RenameDir : %s file rename failed. Directory state is inconsistent!", path)
}
sflock.Unlock()
dflock.Unlock()
} else {
log.Debug("FileCache::RenameDir : Creating local destination directory %s", newPath)
// create the new directory
Expand Down Expand Up @@ -669,6 +687,15 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error {
return nil
}

func (fc *FileCache) getObjectName(localPath string) string {
relPath, err := filepath.Rel(fc.tmpPath, localPath)
if err != nil {
relPath = strings.TrimPrefix(localPath, fc.tmpPath+string(filepath.Separator))
log.Warn("FileCache::getObjectName : filepath.Rel failed on path %s [%v]. Using TrimPrefix: %s", localPath, err, relPath)
}
return common.NormalizeObjectName(relPath)
}

// CreateFile: Create the file in local cache.
func (fc *FileCache) CreateFile(options internal.CreateFileOptions) (*handlemap.Handle, error) {
//defer exectime.StatTimeCurrentBlock("FileCache::CreateFile")()
Expand Down Expand Up @@ -1366,16 +1393,23 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr
func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error {
log.Trace("FileCache::RenameFile : src=%s, dst=%s", options.Src, options.Dst)

// acquire file locks
sflock := fc.fileLocks.Get(options.Src)
sflock.Lock()
defer sflock.Unlock()

dflock := fc.fileLocks.Get(options.Dst)
dflock.Lock()
// prevent deadlock
if options.Src < options.Dst {
foodprocessor marked this conversation as resolved.
Show resolved Hide resolved
sflock.Lock()
dflock.Lock()
} else {
dflock.Lock()
sflock.Lock()
}
defer sflock.Unlock()
defer dflock.Unlock()

err := fc.NextComponent().RenameFile(options)
err = fc.validateStorageError(options.Src, err, "RenameFile", false)
localOnly := os.IsNotExist(err)
err = fc.validateStorageError(options.Src, err, "RenameFile", true)
if err != nil {
log.Err("FileCache::RenameFile : %s failed to rename file [%s]", options.Src, err.Error())
return err
Expand All @@ -1388,22 +1422,62 @@ func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error {
// if we do not perform rename operation locally and those destination files are cached then next time they are read
// we will be serving the wrong content (as we did not rename locally, we still be having older destination files with
// stale content). We either need to remove dest file as well from cache or just run rename to replace the content.
fc.renameCachedFile(localSrcPath, localDstPath)
localRenameErr := fc.renameCachedFile(localSrcPath, localDstPath, sflock, dflock)
if localRenameErr != nil {
// renameCachedFile only returns an error when we are at risk for data loss
if !localOnly {
// we must reverse the cloud rename operation to prevent data loss
err := fc.NextComponent().RenameFile(internal.RenameFileOptions{
Src: options.Dst,
Dst: options.Src,
})
err = fc.validateStorageError(options.Src, err, "RenameFile", false)
if err != nil {
log.Err("FileCache::RenameFile : %s failed to reverse cloud rename to avoid data loss! [%v]", options.Src, err)
}
localRenameErr = errors.Join(localRenameErr, err)
}
return localRenameErr
}

// update any open handles to the file with its new name
if sflock.Count() > 0 {
handlemap.GetHandles().Range(func(key, value any) bool {
handle := value.(*handlemap.Handle)
if handle.Path == options.Src {
handle.Path = options.Dst
}
return true
})
}

return nil
}

func (fc *FileCache) renameCachedFile(localSrcPath string, localDstPath string) {
func (fc *FileCache) renameCachedFile(localSrcPath, localDstPath string, sflock, dflock *common.LockMapItem) error {
err := os.Rename(localSrcPath, localDstPath)
if err != nil {
// if rename fails, we just delete the source file anyway
log.Warn("FileCache::RenameDir : Failed to rename local file %s -> %s. Here's why: %v", localSrcPath, localDstPath, err)
} else {
if os.IsNotExist(err) {
// Case 1
log.Info("FileCache::renameCachedFile : %s source file not cached", localSrcPath)
} else {
log.Warn("FileCache::renameCachedFile : %s -> %s Failed to rename local file. Here's why: %v", localSrcPath, localDstPath, err)
// if the file is not open, it should be backed up already
if sflock.Count() > 0 {
// abort rename to prevent data loss!
log.Err("FileCache::renameCachedFile : %s Failed rename and src is open! Rename should be aborted...", localSrcPath)
return err
}
}
} else if err == nil {
log.Debug("FileCache::renameCachedFile : %s -> %s Successfully renamed local file", localSrcPath, localDstPath)
fc.policy.CacheValid(localDstPath)
}
// delete the source from our cache policy
// this will also delete the source file from local storage (if rename failed)
fc.policy.CachePurge(localSrcPath)

return nil
}

// TruncateFile: Update the file with its new size.
Expand Down
177 changes: 158 additions & 19 deletions component/file_cache/file_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"math"
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"runtime"
Expand Down Expand Up @@ -1321,25 +1322,6 @@ func (suite *fileCacheTestSuite) TestRenameFileInCache() {
suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: openHandle})
}

func (suite *fileCacheTestSuite) TestRenameFileCase2() {
defer suite.cleanupTest()
// Default is to not create empty files on create file to support immutable storage.
src := "source3"
dst := "destination3"
suite.fileCache.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0777})

err := suite.fileCache.RenameFile(internal.RenameFileOptions{Src: src, Dst: dst})
suite.assert.Error(err)
suite.assert.Equal(syscall.EIO, err)

// Src should be in local cache (since we failed the operation)
suite.assert.FileExists(filepath.Join(suite.cache_path, src))
// Src should not be in fake storage
suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, src))
// Dst should not be in fake storage
suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, dst))
}

func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanup() {
defer suite.cleanupTest()
suite.cleanupTest()
Expand Down Expand Up @@ -1375,6 +1357,163 @@ func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanup() {
suite.assert.FileExists(suite.cache_path + "/" + dst) // Dst shall not exists in cache
}

func (suite *fileCacheTestSuite) TestRenameOpenFileCase1() {
defer suite.cleanupTest()

src := "source5"
dst := "destination5"

// create file in cloud
handle, err := suite.loopback.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0666})
suite.assert.NoError(err)
err = suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle})
suite.assert.NoError(err)

// open file for writing
handle, err = suite.fileCache.OpenFile(internal.OpenFileOptions{Name: src, Flags: os.O_RDWR, Mode: 0777})
suite.assert.NoError(err)
handlemap.Add(handle)
// Path should not be in the file cache (lazy open)
suite.assert.NoFileExists(suite.cache_path + "/" + src)

// rename open file
err = suite.fileCache.RenameFile(internal.RenameFileOptions{
Src: src,
Dst: dst,
})
suite.assert.NoError(err)
// rename succeeded in cloud
suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, src))
suite.assert.FileExists(filepath.Join(suite.fake_storage_path, dst))
// still in lazy open state
suite.assert.NoFileExists(filepath.Join(suite.cache_path, src))
suite.assert.NoFileExists(filepath.Join(suite.cache_path, dst))

// write to file handle
data := []byte("newdata")
n, err := suite.fileCache.WriteFile(internal.WriteFileOptions{
Handle: handle,
Data: data,
})
suite.assert.NoError(err)
suite.assert.Equal(len(data), n)
// open is completed (file is downloaded), and writes go to the correct file
suite.assert.NoFileExists(filepath.Join(suite.cache_path, src))
suite.assert.FileExists(filepath.Join(suite.cache_path, dst))

// Close file handle
err = suite.fileCache.CloseFile(internal.CloseFileOptions{
Handle: handle,
})
suite.assert.NoError(err)

// Check cloud storage
suite.assert.NoFileExists(path.Join(suite.fake_storage_path, src)) // Src does not exist
suite.assert.FileExists(path.Join(suite.fake_storage_path, dst)) // Dst does exist
dstData, err := os.ReadFile(path.Join(suite.fake_storage_path, dst))
suite.assert.NoError(err)
suite.assert.Equal(data, dstData)
}

func (suite *fileCacheTestSuite) TestRenameOpenFileCase2() {
defer suite.cleanupTest()

src := "source6"
dst := "destination6"

// create source file
handle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0666})
suite.assert.NoError(err)
handlemap.Add(handle)
// Path should be in the file cache
suite.assert.FileExists(suite.cache_path + "/" + src)

// rename open file
err = suite.fileCache.RenameFile(internal.RenameFileOptions{
Src: src,
Dst: dst,
})
suite.assert.NoError(err)

// write to file handle
data := []byte("newdata")
n, err := suite.fileCache.WriteFile(internal.WriteFileOptions{
Handle: handle,
Data: data,
})
suite.assert.NoError(err)
suite.assert.Equal(len(data), n)

// Close file handle
err = suite.fileCache.CloseFile(internal.CloseFileOptions{
Handle: handle,
})
suite.assert.NoError(err)

// Check cloud storage
suite.assert.NoFileExists(path.Join(suite.fake_storage_path, src)) // Src does not exist
suite.assert.FileExists(path.Join(suite.fake_storage_path, dst)) // Dst does exist
dstData, err := os.ReadFile(path.Join(suite.fake_storage_path, dst))
suite.assert.NoError(err)
suite.assert.Equal(data, dstData)
}

func (suite *fileCacheTestSuite) TestRenameOpenFileCase3() {
defer suite.cleanupTest()

// Setup
src := "source7"
dst := "destination7"
// create source file
handle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0666})
suite.assert.NoError(err)
handlemap.Add(handle)
// Path should be in the file cache
suite.assert.FileExists(suite.cache_path + "/" + src)
// write to file handle
initialData := []byte("initialData")
n, err := suite.fileCache.WriteFile(internal.WriteFileOptions{
Handle: handle,
Data: initialData,
})
suite.assert.NoError(err)
suite.assert.Equal(len(initialData), n)
// flush to cloud
err = suite.fileCache.FlushFile(internal.FlushFileOptions{
Handle: handle,
})
suite.assert.NoError(err)
suite.assert.FileExists(filepath.Join(suite.fake_storage_path, src))

// rename open file
err = suite.fileCache.RenameFile(internal.RenameFileOptions{
Src: src,
Dst: dst,
})
suite.assert.NoError(err)
// write to file handle
newData := []byte("newData")
n, err = suite.fileCache.WriteFile(internal.WriteFileOptions{
Handle: handle,
Data: newData,
Offset: int64(len(initialData)),
})
suite.assert.NoError(err)
suite.assert.Equal(len(newData), n)
// Close file handle
err = suite.fileCache.CloseFile(internal.CloseFileOptions{
Handle: handle,
})
suite.assert.NoError(err)

// Check that cloud storage got all data and file was renamed properly
suite.assert.NoFileExists(path.Join(suite.fake_storage_path, src)) // Src does not exist
suite.assert.FileExists(path.Join(suite.fake_storage_path, dst)) // Dst does exist
dstData, err := os.ReadFile(path.Join(suite.fake_storage_path, dst))
suite.assert.NoError(err)
suite.assert.Equal(append(initialData, newData...), dstData)
}

func (suite *fileCacheTestSuite) TestTruncateFileNotInCache() {
defer suite.cleanupTest()
// Setup
Expand Down
Loading