Skip to content

Commit

Permalink
Fix the problem of the empty directory for breakpoint resume
Browse files Browse the repository at this point in the history
  • Loading branch information
yangzong18 committed Nov 23, 2022
1 parent 25a7611 commit b6adcc9
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 4 deletions.
8 changes: 6 additions & 2 deletions oss/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,15 @@ func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, op
}

func getDownloadCpFilePath(cpConf *cpConfig, srcBucket, srcObject, versionId, destFile string) string {
if cpConf.FilePath == "" && cpConf.DirPath != "" {
if cpConf.FilePath == "" {
src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
absPath, _ := filepath.Abs(destFile)
cpFileName := getCpFileName(src, absPath, versionId)
cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
if cpConf.DirPath != "" {
cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
} else {
cpConf.FilePath = cpFileName
}
}
return cpConf.FilePath
}
Expand Down
154 changes: 154 additions & 0 deletions oss/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,160 @@ func (s *OssDownloadSuite) TestDownloadRoutineWithRecovery(c *C) {
c.Assert(err, IsNil)
}

// TestDownloadWithEmptyPoint multi-routine resumable download with empty file path
func (s *OssDownloadSuite) TestDownloadWithEmptyFilePath(c *C) {
objectName := objectNamePrefix + RandStr(8)
fileName := "../sample/BingWallpaper-2015-11-07.jpg"
newFile := RandStr(8) + ".jpg"

// Upload a file
err := s.bucket.UploadFile(objectName, fileName, 100*1024, Routines(3))
c.Assert(err, IsNil)

// Download a file with default checkpoint
downloadPartHooker = DownErrorHooker
err = s.bucket.DownloadFile(objectName, newFile, 100*1024, Checkpoint(true, ""))
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "ErrorHooker")
downloadPartHooker = defaultDownloadPartHook

// Check
dcp := downloadCheckpoint{}
cpConf := cpConfig{IsEnable: true, FilePath: ""}
cpFilePath := getDownloadCpFilePath(&cpConf, s.bucket.BucketName, objectName, "", newFile)
err = dcp.load(cpFilePath)
c.Assert(err, IsNil)
c.Assert(dcp.Magic, Equals, downloadCpMagic)
c.Assert(len(dcp.MD5), Equals, len("LC34jZU5xK4hlxi3Qn3XGQ=="))
c.Assert(dcp.FilePath, Equals, newFile)
c.Assert(dcp.ObjStat.Size, Equals, int64(482048))
c.Assert(len(dcp.ObjStat.LastModified) > 0, Equals, true)
c.Assert(dcp.ObjStat.Etag, Equals, "\"2351E662233817A7AE974D8C5B0876DD-5\"")
c.Assert(dcp.Object, Equals, objectName)
c.Assert(len(dcp.Parts), Equals, 5)
c.Assert(len(dcp.todoParts()), Equals, 1)

err = s.bucket.DownloadFile(objectName, newFile, 100*1024, Checkpoint(true, ""))
c.Assert(err, IsNil)
//download success, checkpoint file has been deleted
err = dcp.load(cpFilePath)
c.Assert(err, NotNil)

eq, err := compareFiles(fileName, newFile)
c.Assert(err, IsNil)
c.Assert(eq, Equals, true)

// Download a file with default checkpoint
downloadPartHooker = DownErrorHooker
err = s.bucket.DownloadFile(objectName, newFile, 100*1024, Checkpoint(true, ""), CheckpointDir(true, "../"))
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "ErrorHooker")
downloadPartHooker = defaultDownloadPartHook

// Check
cpConf = cpConfig{IsEnable: true, FilePath: "", DirPath: "../"}
cpFilePath = getDownloadCpFilePath(&cpConf, s.bucket.BucketName, objectName, "", newFile)
err = dcp.load(cpFilePath)
c.Assert(err, IsNil)
c.Assert(dcp.Magic, Equals, downloadCpMagic)
c.Assert(len(dcp.MD5), Equals, len("LC34jZU5xK4hlxi3Qn3XGQ=="))
c.Assert(dcp.FilePath, Equals, newFile)
c.Assert(dcp.ObjStat.Size, Equals, int64(482048))
c.Assert(len(dcp.ObjStat.LastModified) > 0, Equals, true)
c.Assert(dcp.ObjStat.Etag, Equals, "\"2351E662233817A7AE974D8C5B0876DD-5\"")
c.Assert(dcp.Object, Equals, objectName)
c.Assert(len(dcp.Parts), Equals, 5)
c.Assert(len(dcp.todoParts()), Equals, 1)

err = s.bucket.DownloadFile(objectName, newFile, 100*1024, Checkpoint(true, ""), CheckpointDir(true, "../"))
c.Assert(err, IsNil)
//download success, checkpoint file has been deleted
err = dcp.load(cpFilePath)
c.Assert(err, NotNil)

eq, err = compareFiles(fileName, newFile)
c.Assert(err, IsNil)
c.Assert(eq, Equals, true)

os.Remove(newFile)
os.Remove(cpFilePath)
}

// TestDownloadWithEmptyPoint multi-routine resumable download version id with empty file path
func (s *OssDownloadSuite) TestVersioningDownloadWithEmptyFilePathCheckPoint(c *C) {
// create a bucket with default proprety
client, err := New(endpoint, accessID, accessKey)
c.Assert(err, IsNil)

bucketName := bucketNamePrefix + RandLowStr(6)
err = client.CreateBucket(bucketName)
c.Assert(err, IsNil)

bucket, err := client.Bucket(bucketName)

// put bucket version:enabled
var versioningConfig VersioningConfig
versioningConfig.Status = string(VersionEnabled)
err = client.SetBucketVersioning(bucketName, versioningConfig)
c.Assert(err, IsNil)
time.Sleep(timeoutInOperation)

// begin test
objectName := objectNamePrefix + RandStr(8)
fileName := "test-file-" + RandStr(8)
fileData := RandStr(500 * 1024)
CreateFile(fileName, fileData, c)
newFile := RandStr(8) + ".jpg"

// Upload a file
var respHeader http.Header
options := []Option{Routines(3), GetResponseHeader(&respHeader)}
err = bucket.UploadFile(objectName, fileName, 100*1024, options...)
c.Assert(err, IsNil)
versionId := GetVersionId(respHeader)
c.Assert(len(versionId) > 0, Equals, true)

// Resumable download with checkpoint dir
os.Remove(newFile)
downloadPartHooker = DownErrorHooker
options = []Option{CheckpointDir(true, "./"), VersionId(versionId)}

err = bucket.DownloadFile(objectName, newFile, 100*1024, options...)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "ErrorHooker")
downloadPartHooker = defaultDownloadPartHook
// Check
dcp := downloadCheckpoint{}
cpConf := cpConfig{IsEnable: true, DirPath: "./"}
cpFilePath := getDownloadCpFilePath(&cpConf, bucketName, objectName, versionId, newFile)
err = dcp.load(cpFilePath)
c.Assert(err, IsNil)
c.Assert(dcp.Magic, Equals, downloadCpMagic)
c.Assert(len(dcp.MD5), Equals, len("LC34jZU5xK4hlxi3Qn3XGQ=="))
c.Assert(dcp.FilePath, Equals, newFile)
c.Assert(dcp.ObjStat.Size, Equals, int64(512000))
c.Assert(len(dcp.ObjStat.LastModified) > 0, Equals, true)
c.Assert(dcp.Object, Equals, objectName)
c.Assert(len(dcp.Parts), Equals, 5)
c.Assert(len(dcp.todoParts()), Equals, 1)

options = []Option{CheckpointDir(true, "./"), VersionId(versionId), GetResponseHeader(&respHeader)}
err = bucket.DownloadFile(objectName, newFile, 100*1024, options...)
c.Assert(err, IsNil)
c.Assert(GetVersionId(respHeader), Equals, versionId)

eq, err := compareFiles(fileName, newFile)
c.Assert(err, IsNil)
c.Assert(eq, Equals, true)

os.Remove(fileName)
os.Remove(newFile)
os.Remove(cpFilePath)
err = bucket.DeleteObject(objectName)
c.Assert(err, IsNil)
ForceDeleteBucket(client, bucketName, c)
}

// TestDownloadOption options
func (s *OssDownloadSuite) TestDownloadOption(c *C) {
objectName := objectNamePrefix + RandStr(8)
Expand Down
8 changes: 6 additions & 2 deletions oss/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@ func (bucket Bucket) UploadFile(objectKey, filePath string, partSize int64, opti
}

func getUploadCpFilePath(cpConf *cpConfig, srcFile, destBucket, destObject string) string {
if cpConf.FilePath == "" && cpConf.DirPath != "" {
if cpConf.FilePath == "" {
dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject)
absPath, _ := filepath.Abs(srcFile)
cpFileName := getCpFileName(absPath, dest, "")
cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
if cpConf.DirPath != "" {
cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
} else {
cpConf.FilePath = cpFileName
}
}
return cpConf.FilePath
}
Expand Down
85 changes: 85 additions & 0 deletions oss/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,91 @@ func (s *OssUploadSuite) TestUploadRoutineWithRecovery(c *C) {
c.Assert(err, IsNil)
}

// TestUploadRoutineWithEmptyCheckPoint is multi-routine upload with empty check point
func (s *OssUploadSuite) TestUploadRoutineWithEmptyFilePathCheckPoint(c *C) {
objectName := objectNamePrefix + RandStr(8)
fileName := "../sample/BingWallpaper-2015-11-07.jpg"
newFile := "upload-new-file-2.jpg"

// Use default routines and default CP file path
// First upload for 4 parts
uploadPartHooker = ErrorHooker
err := s.bucket.UploadFile(objectName, fileName, 100*1024, Checkpoint(true, ""))
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "ErrorHooker")
uploadPartHooker = defaultUploadPart
// Check CP
ucp := uploadCheckpoint{}
cpConf := cpConfig{IsEnable: true, FilePath: ""}
cpFilePath := getUploadCpFilePath(&cpConf, fileName, s.bucket.BucketName, objectName)
err = ucp.load(cpFilePath)
c.Assert(err, IsNil)
c.Assert(ucp.Magic, Equals, uploadCpMagic)
c.Assert(len(ucp.MD5), Equals, len("LC34jZU5xK4hlxi3Qn3XGQ=="))
c.Assert(ucp.FilePath, Equals, fileName)
c.Assert(ucp.FileStat.Size, Equals, int64(482048))
c.Assert(len(ucp.FileStat.LastModified.String()) > 0, Equals, true)
c.Assert(ucp.FileStat.MD5, Equals, "")
c.Assert(ucp.ObjectKey, Equals, objectName)
c.Assert(len(ucp.UploadID), Equals, len("3F79722737D1469980DACEDCA325BB52"))
c.Assert(len(ucp.Parts), Equals, 5)
c.Assert(len(ucp.todoParts()), Equals, 1)
c.Assert(len(ucp.allParts()), Equals, 5)

// Second upload, finish the remaining part
err = s.bucket.UploadFile(objectName, fileName, 100*1024, Checkpoint(true, ""))
c.Assert(err, IsNil)

os.Remove(newFile)
os.Remove(cpFilePath)
err = s.bucket.GetObjectToFile(objectName, newFile)
c.Assert(err, IsNil)

eq, err := compareFiles(fileName, newFile)
c.Assert(err, IsNil)
c.Assert(eq, Equals, true)

err = s.bucket.DeleteObject(objectName)
c.Assert(err, IsNil)

uploadPartHooker = ErrorHooker
err = s.bucket.UploadFile(objectName, fileName, 100*1024, CheckpointDir(true, "./"))
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "ErrorHooker")
uploadPartHooker = defaultUploadPart
// Check CP
cpConf = cpConfig{IsEnable: true, DirPath: "./"}
cpFilePath = getUploadCpFilePath(&cpConf, fileName, s.bucket.BucketName, objectName)
fmt.Printf("cpFilePath is:%#v\n", cpFilePath)
err = ucp.load(cpFilePath)
c.Assert(err, IsNil)
c.Assert(ucp.Magic, Equals, uploadCpMagic)
c.Assert(len(ucp.MD5), Equals, len("LC34jZU5xK4hlxi3Qn3XGQ=="))
c.Assert(ucp.FilePath, Equals, fileName)
c.Assert(ucp.FileStat.Size, Equals, int64(482048))
c.Assert(len(ucp.FileStat.LastModified.String()) > 0, Equals, true)
c.Assert(ucp.FileStat.MD5, Equals, "")
c.Assert(ucp.ObjectKey, Equals, objectName)
c.Assert(len(ucp.UploadID), Equals, len("3F79722737D1469980DACEDCA325BB52"))
c.Assert(len(ucp.Parts), Equals, 5)
c.Assert(len(ucp.todoParts()), Equals, 1)
c.Assert(len(ucp.allParts()), Equals, 5)

err = s.bucket.UploadFile(objectName, fileName, 100*1024, CheckpointDir(true, "./"))
c.Assert(err, IsNil)

os.Remove(newFile)
os.Remove(cpFilePath)

err = s.bucket.GetObjectToFile(objectName, newFile)
c.Assert(err, IsNil)
eq, err = compareFiles(fileName, newFile)
c.Assert(err, IsNil)
c.Assert(eq, Equals, true)
err = s.bucket.DeleteObject(objectName)
c.Assert(err, IsNil)
}

// TestUploadRoutineWithRecoveryNegative is multiroutineed upload without checkpoint
func (s *OssUploadSuite) TestUploadRoutineWithRecoveryNegative(c *C) {
objectName := objectNamePrefix + RandStr(8)
Expand Down

0 comments on commit b6adcc9

Please sign in to comment.