Skip to content

Commit

Permalink
cmd/sync: check the destination of the symbolic link (#4082)
Browse files Browse the repository at this point in the history
Fix two issues

1. symbolic links do not support checksums
2. `check-new` is invalid when `--links`

close #4067
  • Loading branch information
zhijian-pro authored Oct 8, 2023
1 parent 81012aa commit 558ed6a
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 14 deletions.
42 changes: 28 additions & 14 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,23 @@ func copyPerms(dst object.ObjectStorage, obj object.Object) {
logger.Debugf("Copied permissions (%s:%s:%s) for %s in %s", fi.Owner(), fi.Group(), fi.Mode(), key, time.Since(start))
}

func doCheckSum(src, dst object.ObjectStorage, key string, size int64, equal *bool) error {
func doCheckSum(src, dst object.ObjectStorage, key string, obj object.Object, config *Config, equal *bool) error {
if obj.IsSymlink() && config.Links && (config.CheckAll || config.CheckNew) {
var srcLink, dstLink string
var err error
if s, ok := src.(object.SupportSymlink); ok {
if srcLink, err = s.Readlink(key); err != nil {
return err
}
}
if s, ok := dst.(object.SupportSymlink); ok {
if dstLink, err = s.Readlink(key); err != nil {
return err
}
}
*equal = srcLink == dstLink && srcLink != "" && dstLink != ""
return nil
}
abort := make(chan struct{})
checkPart := func(offset, length int64) error {
if limiter != nil {
Expand Down Expand Up @@ -273,16 +289,16 @@ func doCheckSum(src, dst object.ObjectStorage, key string, size int64, equal *bo
}

var err error
if size < maxBlock {
err = checkPart(0, size)
if obj.Size() < maxBlock {
err = checkPart(0, obj.Size())
} else {
n := int((size-1)/defaultPartSize) + 1
n := int((obj.Size()-1)/defaultPartSize) + 1
errs := make(chan error, n)
for i := 0; i < n; i++ {
go func(num int) {
sz := int64(defaultPartSize)
if num == n-1 {
sz = size - int64(num)*defaultPartSize
sz = obj.Size() - int64(num)*defaultPartSize
}
errs <- checkPart(int64(num)*defaultPartSize, sz)
}(i)
Expand All @@ -304,13 +320,13 @@ func doCheckSum(src, dst object.ObjectStorage, key string, size int64, equal *bo
return err
}

func checkSum(src, dst object.ObjectStorage, key string, size int64) (bool, error) {
func checkSum(src, dst object.ObjectStorage, key string, obj object.Object, config *Config) (bool, error) {
start := time.Now()
var equal bool
err := try(3, func() error { return doCheckSum(src, dst, key, size, &equal) })
err := try(3, func() error { return doCheckSum(src, dst, key, obj, config, &equal) })
if err == nil {
checked.Increment()
checkedBytes.IncrInt64(size)
checkedBytes.IncrInt64(obj.Size())
if equal {
logger.Debugf("Checked %s OK (and equal) in %s,", key, time.Since(start))
} else {
Expand Down Expand Up @@ -522,7 +538,7 @@ func worker(tasks <-chan object.Object, src, dst object.ObjectStorage, config *C
break
}
obj = obj.(*withSize).Object
if equal, err := checkSum(src, dst, key, obj.Size()); err != nil {
if equal, err := checkSum(src, dst, key, obj, config); err != nil {
failed.Increment()
break
} else if equal {
Expand Down Expand Up @@ -556,18 +572,16 @@ func worker(tasks <-chan object.Object, src, dst object.ObjectStorage, config *C
}
var err error
if config.Links && obj.IsSymlink() {
if err = copyLink(src, dst, key); err == nil {
copied.Increment()
break
if err = copyLink(src, dst, key); err != nil {
logger.Errorf("copy link failed: %s", err)
}
logger.Errorf("copy link failed: %s", err)
} else {
err = copyData(src, dst, key, obj.Size())
}

if err == nil && (config.CheckAll || config.CheckNew) {
var equal bool
if equal, err = checkSum(src, dst, key, obj.Size()); err == nil && !equal {
if equal, err = checkSum(src, dst, key, obj, config); err == nil && !equal {
err = fmt.Errorf("checksums of copied object %s don't match", key)
}
}
Expand Down
77 changes: 77 additions & 0 deletions pkg/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,83 @@ func TestSingleLink(t *testing.T) {
}
}

func TestSyncCheckAllLink(t *testing.T) {
defer func() {
_ = os.RemoveAll("/tmp/a")
_ = os.RemoveAll("/tmp/b")
}()

a, _ := object.CreateStorage("file", "/tmp/a/", "", "", "")
a.Put("a1", bytes.NewReader([]byte("test")))
as := a.(object.SupportSymlink)
as.Symlink("/tmp/a/a1", "l1")

b, _ := object.CreateStorage("file", "/tmp/b/", "", "", "")
bs := b.(object.SupportSymlink)
bs.Symlink("/tmp/b/a1", "l1")

if err := Sync(a, b, &Config{
Threads: 50,
Perms: true,
Links: true,
Quiet: true,
Limit: -1,
CheckAll: true,
}); err != nil {
t.Fatalf("sync: %s", err)
}

l1, err := bs.Readlink("l1")
if err != nil || l1 != "/tmp/a/a1" {
t.Fatalf("readlink: %s content: %s", err, l1)
}
content, err := b.Get("l1", 0, -1)
if err != nil {
t.Fatalf("get content failed: %s", err)
}
if c, err := io.ReadAll(content); err != nil || string(c) != "test" {
t.Fatalf("read content failed: err %s content %s", err, string(c))
}
}

func TestSyncCheckNewLink(t *testing.T) {
defer func() {
_ = os.RemoveAll("/tmp/a")
_ = os.RemoveAll("/tmp/b")
}()

a, _ := object.CreateStorage("file", "/tmp/a/", "", "", "")
a.Put("a1", bytes.NewReader([]byte("test")))
as := a.(object.SupportSymlink)
as.Symlink("/tmp/a/a1", "l1")

b, _ := object.CreateStorage("file", "/tmp/b/", "", "", "")
bs := b.(object.SupportSymlink)

if err := Sync(a, b, &Config{
Threads: 50,
Perms: true,
Links: true,
Quiet: true,
Limit: -1,
CheckNew: true,
}); err != nil {
t.Fatalf("sync: %s", err)
}

l1, err := bs.Readlink("l1")
if err != nil || l1 != "/tmp/a/a1" {
t.Fatalf("readlink: %s content: %s", err, l1)
}
content, err := b.Get("l1", 0, -1)
if err != nil {
t.Fatalf("get content failed: %s", err)
}
if c, err := io.ReadAll(content); err != nil || string(c) != "test" {
t.Fatalf("read content failed: err %s content %s", err, string(c))
}
}

func TestLimits(t *testing.T) {
defer func() {
_ = os.RemoveAll("/tmp/a/")
Expand Down

0 comments on commit 558ed6a

Please sign in to comment.