diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 9884a97d1..d00597700 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -27,6 +27,7 @@ package file_cache import ( "context" + "errors" "fmt" "io" "io/fs" @@ -635,7 +636,24 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { newPath := strings.Replace(path, localSrcPath, localDstPath, 1) if !d.IsDir() { log.Debug("FileCache::RenameDir : Renaming local file %s -> %s", path, newPath) - fc.renameCachedFile(path, newPath) + // get locks + sflock := fc.fileLocks.Get(fc.getObjectName(path)) + dflock := fc.fileLocks.Get(fc.getObjectName(newPath)) + if path < newPath { + sflock.Lock() + dflock.Lock() + } else { + dflock.Lock() + sflock.Lock() + } + // complete local rename + err := fc.renameCachedFile(path, newPath, sflock, dflock) + if err != nil { + // there's really not much we can do to handle the error, so just log it + log.Err("FileCache::RenameDir : %s file rename failed. Directory state is inconsistent!", path) + } + sflock.Unlock() + dflock.Unlock() } else { log.Debug("FileCache::RenameDir : Creating local destination directory %s", newPath) // create the new directory @@ -669,6 +687,15 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { return nil } +func (fc *FileCache) getObjectName(localPath string) string { + relPath, err := filepath.Rel(fc.tmpPath, localPath) + if err != nil { + relPath = strings.TrimPrefix(localPath, fc.tmpPath+string(filepath.Separator)) + log.Warn("FileCache::getObjectName : filepath.Rel failed on path %s [%v]. Using TrimPrefix: %s", localPath, err, relPath) + } + return common.NormalizeObjectName(relPath) +} + // CreateFile: Create the file in local cache. func (fc *FileCache) CreateFile(options internal.CreateFileOptions) (*handlemap.Handle, error) { //defer exectime.StatTimeCurrentBlock("FileCache::CreateFile")() @@ -1366,16 +1393,23 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error { log.Trace("FileCache::RenameFile : src=%s, dst=%s", options.Src, options.Dst) + // acquire file locks sflock := fc.fileLocks.Get(options.Src) - sflock.Lock() - defer sflock.Unlock() - dflock := fc.fileLocks.Get(options.Dst) - dflock.Lock() + // prevent deadlock + if options.Src < options.Dst { + sflock.Lock() + dflock.Lock() + } else { + dflock.Lock() + sflock.Lock() + } + defer sflock.Unlock() defer dflock.Unlock() err := fc.NextComponent().RenameFile(options) - err = fc.validateStorageError(options.Src, err, "RenameFile", false) + localOnly := os.IsNotExist(err) + err = fc.validateStorageError(options.Src, err, "RenameFile", true) if err != nil { log.Err("FileCache::RenameFile : %s failed to rename file [%s]", options.Src, err.Error()) return err @@ -1388,22 +1422,62 @@ func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error { // if we do not perform rename operation locally and those destination files are cached then next time they are read // we will be serving the wrong content (as we did not rename locally, we still be having older destination files with // stale content). We either need to remove dest file as well from cache or just run rename to replace the content. - fc.renameCachedFile(localSrcPath, localDstPath) + localRenameErr := fc.renameCachedFile(localSrcPath, localDstPath, sflock, dflock) + if localRenameErr != nil { + // renameCachedFile only returns an error when we are at risk for data loss + if !localOnly { + // we must reverse the cloud rename operation to prevent data loss + err := fc.NextComponent().RenameFile(internal.RenameFileOptions{ + Src: options.Dst, + Dst: options.Src, + }) + err = fc.validateStorageError(options.Src, err, "RenameFile", false) + if err != nil { + log.Err("FileCache::RenameFile : %s failed to reverse cloud rename to avoid data loss! [%v]", options.Src, err) + } + localRenameErr = errors.Join(localRenameErr, err) + } + return localRenameErr + } + + // update any open handles to the file with its new name + if sflock.Count() > 0 { + handlemap.GetHandles().Range(func(key, value any) bool { + handle := value.(*handlemap.Handle) + if handle.Path == options.Src { + handle.Path = options.Dst + } + return true + }) + } return nil } -func (fc *FileCache) renameCachedFile(localSrcPath string, localDstPath string) { +func (fc *FileCache) renameCachedFile(localSrcPath, localDstPath string, sflock, dflock *common.LockMapItem) error { err := os.Rename(localSrcPath, localDstPath) if err != nil { - // if rename fails, we just delete the source file anyway - log.Warn("FileCache::RenameDir : Failed to rename local file %s -> %s. Here's why: %v", localSrcPath, localDstPath, err) - } else { + if os.IsNotExist(err) { + // Case 1 + log.Info("FileCache::renameCachedFile : %s source file not cached", localSrcPath) + } else { + log.Warn("FileCache::renameCachedFile : %s -> %s Failed to rename local file. Here's why: %v", localSrcPath, localDstPath, err) + // if the file is not open, it should be backed up already + if sflock.Count() > 0 { + // abort rename to prevent data loss! + log.Err("FileCache::renameCachedFile : %s Failed rename and src is open! Rename should be aborted...", localSrcPath) + return err + } + } + } else if err == nil { + log.Debug("FileCache::renameCachedFile : %s -> %s Successfully renamed local file", localSrcPath, localDstPath) fc.policy.CacheValid(localDstPath) } // delete the source from our cache policy // this will also delete the source file from local storage (if rename failed) fc.policy.CachePurge(localSrcPath) + + return nil } // TruncateFile: Update the file with its new size. diff --git a/component/file_cache/file_cache_test.go b/component/file_cache/file_cache_test.go index 3e033eda7..f98f75205 100644 --- a/component/file_cache/file_cache_test.go +++ b/component/file_cache/file_cache_test.go @@ -34,6 +34,7 @@ import ( "math" "os" "os/exec" + "path" "path/filepath" "regexp" "runtime" @@ -1321,25 +1322,6 @@ func (suite *fileCacheTestSuite) TestRenameFileInCache() { suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: openHandle}) } -func (suite *fileCacheTestSuite) TestRenameFileCase2() { - defer suite.cleanupTest() - // Default is to not create empty files on create file to support immutable storage. - src := "source3" - dst := "destination3" - suite.fileCache.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0777}) - - err := suite.fileCache.RenameFile(internal.RenameFileOptions{Src: src, Dst: dst}) - suite.assert.Error(err) - suite.assert.Equal(syscall.EIO, err) - - // Src should be in local cache (since we failed the operation) - suite.assert.FileExists(filepath.Join(suite.cache_path, src)) - // Src should not be in fake storage - suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, src)) - // Dst should not be in fake storage - suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, dst)) -} - func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanup() { defer suite.cleanupTest() suite.cleanupTest() @@ -1375,6 +1357,163 @@ func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanup() { suite.assert.FileExists(suite.cache_path + "/" + dst) // Dst shall not exists in cache } +func (suite *fileCacheTestSuite) TestRenameOpenFileCase1() { + defer suite.cleanupTest() + + src := "source5" + dst := "destination5" + + // create file in cloud + handle, err := suite.loopback.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0666}) + suite.assert.NoError(err) + err = suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) + suite.assert.NoError(err) + + // open file for writing + handle, err = suite.fileCache.OpenFile(internal.OpenFileOptions{Name: src, Flags: os.O_RDWR, Mode: 0777}) + suite.assert.NoError(err) + handlemap.Add(handle) + // Path should not be in the file cache (lazy open) + suite.assert.NoFileExists(suite.cache_path + "/" + src) + + // rename open file + err = suite.fileCache.RenameFile(internal.RenameFileOptions{ + Src: src, + Dst: dst, + }) + suite.assert.NoError(err) + // rename succeeded in cloud + suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, src)) + suite.assert.FileExists(filepath.Join(suite.fake_storage_path, dst)) + // still in lazy open state + suite.assert.NoFileExists(filepath.Join(suite.cache_path, src)) + suite.assert.NoFileExists(filepath.Join(suite.cache_path, dst)) + + // write to file handle + data := []byte("newdata") + n, err := suite.fileCache.WriteFile(internal.WriteFileOptions{ + Handle: handle, + Data: data, + }) + suite.assert.NoError(err) + suite.assert.Equal(len(data), n) + // open is completed (file is downloaded), and writes go to the correct file + suite.assert.NoFileExists(filepath.Join(suite.cache_path, src)) + suite.assert.FileExists(filepath.Join(suite.cache_path, dst)) + + // Close file handle + err = suite.fileCache.CloseFile(internal.CloseFileOptions{ + Handle: handle, + }) + suite.assert.NoError(err) + + // Check cloud storage + suite.assert.NoFileExists(path.Join(suite.fake_storage_path, src)) // Src does not exist + suite.assert.FileExists(path.Join(suite.fake_storage_path, dst)) // Dst does exist + dstData, err := os.ReadFile(path.Join(suite.fake_storage_path, dst)) + suite.assert.NoError(err) + suite.assert.Equal(data, dstData) +} + +func (suite *fileCacheTestSuite) TestRenameOpenFileCase2() { + defer suite.cleanupTest() + + src := "source6" + dst := "destination6" + + // create source file + handle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0666}) + suite.assert.NoError(err) + handlemap.Add(handle) + // Path should be in the file cache + suite.assert.FileExists(suite.cache_path + "/" + src) + + // rename open file + err = suite.fileCache.RenameFile(internal.RenameFileOptions{ + Src: src, + Dst: dst, + }) + suite.assert.NoError(err) + + // write to file handle + data := []byte("newdata") + n, err := suite.fileCache.WriteFile(internal.WriteFileOptions{ + Handle: handle, + Data: data, + }) + suite.assert.NoError(err) + suite.assert.Equal(len(data), n) + + // Close file handle + err = suite.fileCache.CloseFile(internal.CloseFileOptions{ + Handle: handle, + }) + suite.assert.NoError(err) + + // Check cloud storage + suite.assert.NoFileExists(path.Join(suite.fake_storage_path, src)) // Src does not exist + suite.assert.FileExists(path.Join(suite.fake_storage_path, dst)) // Dst does exist + dstData, err := os.ReadFile(path.Join(suite.fake_storage_path, dst)) + suite.assert.NoError(err) + suite.assert.Equal(data, dstData) +} + +func (suite *fileCacheTestSuite) TestRenameOpenFileCase3() { + defer suite.cleanupTest() + + // Setup + src := "source7" + dst := "destination7" + // create source file + handle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0666}) + suite.assert.NoError(err) + handlemap.Add(handle) + // Path should be in the file cache + suite.assert.FileExists(suite.cache_path + "/" + src) + // write to file handle + initialData := []byte("initialData") + n, err := suite.fileCache.WriteFile(internal.WriteFileOptions{ + Handle: handle, + Data: initialData, + }) + suite.assert.NoError(err) + suite.assert.Equal(len(initialData), n) + // flush to cloud + err = suite.fileCache.FlushFile(internal.FlushFileOptions{ + Handle: handle, + }) + suite.assert.NoError(err) + suite.assert.FileExists(filepath.Join(suite.fake_storage_path, src)) + + // rename open file + err = suite.fileCache.RenameFile(internal.RenameFileOptions{ + Src: src, + Dst: dst, + }) + suite.assert.NoError(err) + // write to file handle + newData := []byte("newData") + n, err = suite.fileCache.WriteFile(internal.WriteFileOptions{ + Handle: handle, + Data: newData, + Offset: int64(len(initialData)), + }) + suite.assert.NoError(err) + suite.assert.Equal(len(newData), n) + // Close file handle + err = suite.fileCache.CloseFile(internal.CloseFileOptions{ + Handle: handle, + }) + suite.assert.NoError(err) + + // Check that cloud storage got all data and file was renamed properly + suite.assert.NoFileExists(path.Join(suite.fake_storage_path, src)) // Src does not exist + suite.assert.FileExists(path.Join(suite.fake_storage_path, dst)) // Dst does exist + dstData, err := os.ReadFile(path.Join(suite.fake_storage_path, dst)) + suite.assert.NoError(err) + suite.assert.Equal(append(initialData, newData...), dstData) +} + func (suite *fileCacheTestSuite) TestTruncateFileNotInCache() { defer suite.cleanupTest() // Setup