Skip to content

Commit

Permalink
cmd/sync: return list objects error (juicedata#4879)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijian-pro authored May 22, 2024
1 parent b78410d commit ca15ab5
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,28 +770,26 @@ func startSingleProducer(tasks chan<- object.Object, src, dst object.ObjectStora
return fmt.Errorf("list %s: %s", dst, err)
}
}
produce(tasks, srckeys, dstkeys, config)
return nil
return produce(tasks, srckeys, dstkeys, config)
}

func produce(tasks chan<- object.Object, srckeys, dstkeys <-chan object.Object, config *Config) {
func produce(tasks chan<- object.Object, srckeys, dstkeys <-chan object.Object, config *Config) error {
if len(config.rules) > 0 {
srckeys = filter(srckeys, config.rules, config)
dstkeys = filter(dstkeys, config.rules, config)
}
var dstobj object.Object
for obj := range srckeys {
if obj == nil {
logger.Errorf("Listing failed, stop syncing, waiting for pending ones")
return
return fmt.Errorf("listing failed, stop syncing, waiting for pending ones")
}
if !config.Dirs && obj.IsDir() {
logger.Debug("Ignore directory ", obj.Key())
continue
}
if config.Limit >= 0 {
if config.Limit == 0 {
return
return nil
}
config.Limit--
}
Expand All @@ -800,23 +798,22 @@ func produce(tasks chan<- object.Object, srckeys, dstkeys <-chan object.Object,
if dstobj != nil && obj.Key() > dstobj.Key() {
if config.DeleteDst {
if deleteFromDst(tasks, dstobj, config) {
return
return nil
}
}
dstobj = nil
}
if dstobj == nil {
for dstobj = range dstkeys {
if dstobj == nil {
logger.Errorf("Listing failed, stop syncing, waiting for pending ones")
return
return fmt.Errorf("listing failed, stop syncing, waiting for pending ones")
}
if obj.Key() <= dstobj.Key() {
break
}
if config.DeleteDst {
if deleteFromDst(tasks, dstobj, config) {
return
return nil
}
}
dstobj = nil
Expand Down Expand Up @@ -865,17 +862,18 @@ func produce(tasks chan<- object.Object, srckeys, dstkeys <-chan object.Object,
if config.DeleteDst {
if dstobj != nil {
if deleteFromDst(tasks, dstobj, config) {
return
return nil
}
}
for dstobj = range dstkeys {
if dstobj != nil {
if deleteFromDst(tasks, dstobj, config) {
return
return nil
}
}
}
}
return nil
}

type rule struct {
Expand Down Expand Up @@ -920,6 +918,8 @@ func filter(keys <-chan object.Object, rules []rule, config *Config) <-chan obje
go func() {
for o := range keys {
if o == nil {
// Telling that the listing has failed
r <- nil
break
}
var ok bool
Expand Down Expand Up @@ -1123,8 +1123,7 @@ func startProducer(tasks chan<- object.Object, src, dst object.ObjectStorage, pr
}
close(dstkeys)
logger.Debugf("produce single key %s", config.Start)
produce(tasks, srckeys, dstkeys, config)
return nil
return produce(tasks, srckeys, dstkeys, config)
} else {
logger.Warnf("head %s from %s: %s", config.Start, dst, err)
}
Expand Down Expand Up @@ -1210,7 +1209,9 @@ func startProducer(tasks chan<- object.Object, src, dst object.ObjectStorage, pr
}
}
// sync returned objects
produce(tasks, srckeys, dstkeys, config)
if err := produce(tasks, srckeys, dstkeys, config); err != nil {
return err
}
// consume all the keys from dst
for range dstkeys {
}
Expand Down

0 comments on commit ca15ab5

Please sign in to comment.