Skip to content

Commit

Permalink
spv: Process header batches in parallel
Browse files Browse the repository at this point in the history
This switches the initial SPV header fetching process to process each
batch of headers in parallel through its various stages.
  • Loading branch information
matheusd committed Dec 5, 2023
1 parent 6693162 commit 4aeb5cb
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 63 deletions.
6 changes: 3 additions & 3 deletions spv/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, nodes []*wallet.BlockN
// watchdog interval means we'll try at least 4 different peers before
// resetting.
const watchdogTimeoutInterval = 2 * time.Minute
watchdogCtx, cancelWatchdog := context.WithTimeout(ctx, time.Minute)
watchdogCtx, cancelWatchdog := context.WithTimeout(ctx, watchdogTimeoutInterval)
defer cancelWatchdog()

nextTry:
Expand Down Expand Up @@ -226,7 +226,7 @@ type headersBatch struct {
//
// This function returns a batch with the done flag set to true when no peers
// have more recent blocks for syncing.
func (s *Syncer) getHeaders(ctx context.Context) (*headersBatch, error) {
func (s *Syncer) getHeaders(ctx context.Context, likelyBestChain []*wallet.BlockNode) (*headersBatch, error) {
cnet := s.wallet.ChainParams().Net

nextbatch:
Expand All @@ -245,7 +245,7 @@ nextbatch:
log.Tracef("Attempting next batch of headers from %v", rp)

// Request headers from the selected peer.
locators, locatorHeight, err := s.wallet.BlockLocators(ctx, nil)
locators, locatorHeight, err := s.wallet.BlockLocators(ctx, likelyBestChain)
if err != nil {
return nil, err
}
Expand Down
271 changes: 211 additions & 60 deletions spv/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,81 +1322,232 @@ var hashStop chainhash.Hash
// is up to date with all connected peers. This is part of the startup sync
// process.
func (s *Syncer) initialSyncHeaders(ctx context.Context) error {
startTime := time.Now()
// The strategy for fetching initial headers is to split each batch of
// headers into 3 stages that are run in parallel, with each batch
// flowing through the pipeline:
//
// 1. Fetch headers
// 2. Fetch cfilters
// 3. Chain switch
//
// In the positive path during IBD, this allows processing the chain
// much faster, because the bottleneck is generally the latency for
// completing steps 1 and 2, and thus running them in parallel for
// each batch speeds up the overall sync time.
//
// The pipelining is accomplished by assuming that the next batch of
// headers will be a successor of the most recently fetched batch and
// requesting this next batch using locators derived from this (not yet
// completely processed) batch. For all but the most recent blocks in
// the main chain, this assumption will be true.
//
// Occasionally, in case of misbehaving peers, the sync process might be
// entirely reset and started from scratch using only the wallet derived
// block locators.
//
// The three stages are run as goroutines in the following g.
g, ctx := errgroup.WithContext(ctx)

nextbatch:
for ctx.Err() == nil {
// Fetch a batch of headers.
batch, err := s.getHeaders(ctx)
if err != nil {
return err
}
if batch.done {
// All done.
log.Debugf("Initial sync completed in %s",
time.Since(startTime).Round(time.Second))
return nil
// invalidateBatchChan is closed when one of the later stages (cfilter
// fetching or chain switch) fails and the header fetching stage should
// restart without using the cached chain.
invalidateBatchChan := make(chan struct{})
var invalidateBatchMu sync.Mutex
invalidateBatch := func() {
invalidateBatchMu.Lock()
select {
case <-invalidateBatchChan:
default:
close(invalidateBatchChan)
}
invalidateBatchMu.Unlock()
}
refreshInvalidateBatchChan := func() {
invalidateBatchMu.Lock()
invalidateBatchChan = make(chan struct{})
invalidateBatchMu.Unlock()
}

bestChain := batch.bestChain
// Stage 1: fetch headers.
headersChan := make(chan *headersBatch)
g.Go(func() error {
var batch *headersBatch
var err error
for {
// If we have a previous batch, the next batch is
// likely to be a successor to it.
var likelyBestChain []*wallet.BlockNode
if batch != nil {
likelyBestChain = batch.bestChain
}

// Determine which nodes don't have cfilters yet.
s.sidechainMu.Lock()
var missingCfilter []*wallet.BlockNode
for i := range bestChain {
if bestChain[i].FilterV2 == nil {
missingCfilter = bestChain[i:]
break
// Fetch a batch of headers.
batch, err = s.getHeaders(ctx, likelyBestChain)
if err != nil {
return err
}
}
s.sidechainMu.Unlock()

// Fetch Missing CFilters.
err = s.cfiltersV2FromNodes(ctx, missingCfilter)
if err != nil {
log.Debugf("Unable to fetch missing cfilters from %v: %v",
batch.rp, err)
continue nextbatch
}
if len(missingCfilter) > 0 {
log.Debugf("Fetched %d new cfilters(s) ending at height %d",
len(missingCfilter), missingCfilter[len(missingCfilter)-1].Header.Height)
// Before sending to the next stage, check if the
// last one has already been invalidated while we
// were waiting for a getHeaders response.
select {
case <-invalidateBatchChan:
batch = nil
refreshInvalidateBatchChan()
continue
default:
}

// Otherwise, send to next stage.
select {
case headersChan <- batch:
case <-invalidateBatchChan:
batch = nil
refreshInvalidateBatchChan()
continue
case <-ctx.Done():
return ctx.Err()
}

if batch.done {
return nil
}
}
})

// Switch the best chain, now that all cfilters have been
// fetched for it.
s.sidechainMu.Lock()
prevChain, err := s.wallet.ChainSwitch(ctx, &s.sidechains, bestChain, nil)
if err != nil {
// Stage 2: fetch cfilters.
cfiltersChan := make(chan *headersBatch)
g.Go(func() error {
var batch *headersBatch
var err error
for {
// Wait for a batch of headers.
select {
case batch = <-headersChan:
case <-ctx.Done():
return ctx.Err()
}

// Once done, send the last batch forward and return.
if batch.done {
select {
case cfiltersChan <- batch:
case <-ctx.Done():
return ctx.Err()
}
return nil
}

// Determine which nodes don't have cfilters yet.
s.sidechainMu.Lock()
var missingCfilter []*wallet.BlockNode
for i := range batch.bestChain {
if batch.bestChain[i].FilterV2 == nil {
missingCfilter = batch.bestChain[i:]
break
}
}
s.sidechainMu.Unlock()
batch.rp.Disconnect(err)
continue nextbatch
}

if len(prevChain) != 0 {
log.Infof("Reorganize from %v to %v (total %d block(s) reorged)",
prevChain[len(prevChain)-1].Hash, bestChain[len(bestChain)-1].Hash, len(prevChain))
for _, n := range prevChain {
s.sidechains.AddBlockNode(n)
// Fetch Missing CFilters.
err = s.cfiltersV2FromNodes(ctx, missingCfilter)
if errors.Is(err, errCfilterWatchdogTriggered) {
log.Debugf("Unable to fetch missing cfilters from %v: %v",
batch.rp, err)
invalidateBatch()
continue
}
if err != nil {
return err
}

if len(missingCfilter) > 0 {
lastHeight := missingCfilter[len(missingCfilter)-1].Header.Height
log.Debugf("Fetched %d new cfilters(s) ending at height %d",
len(missingCfilter), lastHeight)
}

// Pass the batch to the next stage.
select {
case cfiltersChan <- batch:
case <-ctx.Done():
return ctx.Err()
}
}
tip := bestChain[len(bestChain)-1]
if len(bestChain) == 1 {
log.Infof("Connected block %v, height %d", tip.Hash, tip.Header.Height)
} else {
log.Infof("Connected %d blocks, new tip %v, height %d, date %v",
len(bestChain), tip.Hash, tip.Header.Height, tip.Header.Timestamp)
}
s.fetchHeadersProgress(tip.Header)
})

s.sidechainMu.Unlock()
// Stage 3: chain switch.
g.Go(func() error {
startTime := time.Now()
for ctx.Err() == nil {
// Fetch a batch with cfilters filled in.
var batch *headersBatch
select {
case batch = <-cfiltersChan:
case <-ctx.Done():
return ctx.Err()
}

// Peers should not be significantly behind the new tip.
s.setRequiredHeight(int32(tip.Header.Height))
s.disconnectStragglers(int32(tip.Header.Height))
}
if batch.done {
// All done.
log.Debugf("Initial sync completed in %s",
time.Since(startTime).Round(time.Second))
return nil
}

// Switch the best chain, now that all cfilters have been
// fetched for it.
s.sidechainMu.Lock()
bestChain := batch.bestChain

// When the first N blocks of bestChain have already been added
// to mainchain tip, they don't have to be added again. This
// happens when requesting batches ahead of time.
tipHash, tipHeight := s.wallet.MainChainTip(ctx)
tipIndex := tipHeight - int32(bestChain[0].Header.Height)
if tipIndex > 0 && tipIndex < int32(len(bestChain)-1) && *bestChain[tipIndex].Hash == tipHash {
log.Tracef("Updating bestChain to tipIndex %d from %d to %d",
tipIndex, bestChain[0].Header.Height,
bestChain[tipIndex+1].Header.Height)
bestChain = bestChain[tipIndex+1:]
}

// Switch to the new main chain.
prevChain, err := s.wallet.ChainSwitch(ctx, &s.sidechains, bestChain, nil)
if err != nil {
s.sidechainMu.Unlock()
batch.rp.Disconnect(err)
invalidateBatch()
continue
}

return ctx.Err()
if len(prevChain) != 0 {
log.Infof("Reorganize from %v to %v (total %d block(s) reorged)",
prevChain[len(prevChain)-1].Hash, bestChain[len(bestChain)-1].Hash, len(prevChain))
for _, n := range prevChain {
s.sidechains.AddBlockNode(n)
}
}
tip := bestChain[len(bestChain)-1]
if len(bestChain) == 1 {
log.Infof("Connected block %v, height %d", tip.Hash, tip.Header.Height)
} else {
log.Infof("Connected %d blocks, new tip %v, height %d, date %v",
len(bestChain), tip.Hash, tip.Header.Height, tip.Header.Timestamp)
}
s.fetchHeadersProgress(tip.Header)

s.sidechainMu.Unlock()

// Peers should not be significantly behind the new tip.
s.setRequiredHeight(int32(tip.Header.Height))
s.disconnectStragglers(int32(tip.Header.Height))
}

return ctx.Err()
})

return g.Wait()
}

// initialSyncRescan performs account and address discovery and rescans blocks
Expand Down

0 comments on commit 4aeb5cb

Please sign in to comment.