Skip to content

Commit

Permalink
Merge pull request #14 from gfanton/fix/meta-heads
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton authored Mar 2, 2023
2 parents b9644b2 + fc745c8 commit a3b293a
Showing 1 changed file with 59 additions and 23 deletions.
82 changes: 59 additions & 23 deletions orbitdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ func (s *BertyOrbitDB) setHeadsForGroup(ctx context.Context, g *protocoltypes.Gr
metaImpl = existingGC.metadataStore
messagesImpl = existingGC.messageStore
}

if metaImpl == nil || messagesImpl == nil {
groupID := g.GroupIDAsString()
s.groups.Store(groupID, g)
Expand Down Expand Up @@ -309,37 +308,74 @@ func (s *BertyOrbitDB) setHeadsForGroup(ctx context.Context, g *protocoltypes.Gr
return errcode.ErrInternal.Wrap(fmt.Errorf("metadata store is nil"))
}

messageHeadsEntries := make([]ipfslog.Entry, len(messageHeads))
for i, h := range messageHeads {
messageHeadsEntries[i] = &entry.Entry{Hash: h}
}
var wg sync.WaitGroup

messagesImpl.Replicator().Load(ctx, messageHeadsEntries)
// load and wait heads for metadata and message stores
wg.Add(2)

metaHeadsEntries := make([]ipfslog.Entry, len(metaHeads))
for i, h := range metaHeads {
metaHeadsEntries[i] = &entry.Entry{Hash: h}
}
go func() {
// load meta heads
if err := s.loadHeads(ctx, metaImpl, metaHeads); err != nil {
s.Logger().Error("unable to load metadata heads", zap.Error(err))
}

if len(metaHeads) == 0 {
return nil
}
wg.Done()
}()

chSub, err := metaImpl.EventBus().Subscribe(new(stores.EventReplicated))
go func() {
// load message heads
if err := s.loadHeads(ctx, messagesImpl, messageHeads); err != nil {
s.Logger().Error("unable to load message heads", zap.Error(err))
}

wg.Done()
}()

wg.Wait()

return nil
}

func (s *BertyOrbitDB) loadHeads(ctx context.Context, store iface.Store, heads []cid.Cid) (err error) {
sub, err := store.EventBus().Subscribe(new(stores.EventReplicated))
if err != nil {
// something is really wrong if this happens, so better return an error
return fmt.Errorf("unable to subscribe to EventReplicated")
}
defer chSub.Close()
defer sub.Close()

// check and generate missing entries if needed
headsEntries := make([]ipfslog.Entry, len(heads))
for i, h := range heads {
if _, ok := store.OpLog().Get(h); !ok {
headsEntries[i] = &entry.Entry{Hash: h}
}
}

// start to load metadata heads
metaImpl.Replicator().Load(ctx, metaHeadsEntries)
if len(headsEntries) == 0 {
return nil
}

// wait for load to finish
select {
case <-chSub.Out():
case <-s.ctx.Done():
return s.ctx.Err()
store.Replicator().Load(ctx, headsEntries)

for found := 0; found < len(heads); {
// wait for load to finish
select {
case e := <-sub.Out():
evt := e.(stores.EventReplicated)

// iterate over entries from replicated event to search for our heads
for _, headEntry := range headsEntries {
for _, evtEntry := range evt.Entries {
if evtEntry.Equals(headEntry) {
found++
break
}
}
}

case <-s.ctx.Done():
return s.ctx.Err()
}
}

return nil
Expand Down

0 comments on commit a3b293a

Please sign in to comment.