diff --git a/spv/backend.go b/spv/backend.go index 8ffd3d8a7..8c834b860 100644 --- a/spv/backend.go +++ b/spv/backend.go @@ -148,7 +148,7 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, nodes []*wallet.BlockN // watchdog interval means we'll try at least 4 different peers before // resetting. const watchdogTimeoutInterval = 2 * time.Minute - watchdogCtx, cancelWatchdog := context.WithTimeout(ctx, time.Minute) + watchdogCtx, cancelWatchdog := context.WithTimeout(ctx, watchdogTimeoutInterval) defer cancelWatchdog() nextTry: @@ -226,7 +226,7 @@ type headersBatch struct { // // This function returns a batch with the done flag set to true when no peers // have more recent blocks for syncing. -func (s *Syncer) getHeaders(ctx context.Context) (*headersBatch, error) { +func (s *Syncer) getHeaders(ctx context.Context, likelyBestChain []*wallet.BlockNode) (*headersBatch, error) { cnet := s.wallet.ChainParams().Net nextbatch: @@ -245,7 +245,7 @@ nextbatch: log.Tracef("Attempting next batch of headers from %v", rp) // Request headers from the selected peer. - locators, locatorHeight, err := s.wallet.BlockLocators(ctx, nil) + locators, locatorHeight, err := s.wallet.BlockLocators(ctx, likelyBestChain) if err != nil { return nil, err } diff --git a/spv/sync.go b/spv/sync.go index 86c563e5c..4763b3496 100644 --- a/spv/sync.go +++ b/spv/sync.go @@ -1322,81 +1322,232 @@ var hashStop chainhash.Hash // is up to date with all connected peers. This is part of the startup sync // process. func (s *Syncer) initialSyncHeaders(ctx context.Context) error { - startTime := time.Now() + // The strategy for fetching initial headers is to split each batch of + // headers into 3 stages that are run in parallel, with each batch + // flowing through the pipeline: + // + // 1. Fetch headers + // 2. Fetch cfilters + // 3. Chain switch + // + // In the positive path during IBD, this allows processing the chain + // much faster, because the bottleneck is generally the latency for + // completing steps 1 and 2, and thus running them in parallel for + // each batch speeds up the overall sync time. + // + // The pipelining is accomplished by assuming that the next batch of + // headers will be a successor of the most recently fetched batch and + // requesting this next batch using locators derived from this (not yet + // completely processed) batch. For all but the most recent blocks in + // the main chain, this assumption will be true. + // + // Occasionally, in case of misbehaving peers, the sync process might be + // entirely reset and started from scratch using only the wallet derived + // block locators. + // + // The three stages are run as goroutines in the following g. + g, ctx := errgroup.WithContext(ctx) -nextbatch: - for ctx.Err() == nil { - // Fetch a batch of headers. - batch, err := s.getHeaders(ctx) - if err != nil { - return err - } - if batch.done { - // All done. - log.Debugf("Initial sync completed in %s", - time.Since(startTime).Round(time.Second)) - return nil + // invalidateBatchChan is closed when one of the later stages (cfilter + // fetching or chain switch) fails and the header fetching stage should + // restart without using the cached chain. + invalidateBatchChan := make(chan struct{}) + var invalidateBatchMu sync.Mutex + invalidateBatch := func() { + invalidateBatchMu.Lock() + select { + case <-invalidateBatchChan: + default: + close(invalidateBatchChan) } + invalidateBatchMu.Unlock() + } + refreshInvalidateBatchChan := func() { + invalidateBatchMu.Lock() + invalidateBatchChan = make(chan struct{}) + invalidateBatchMu.Unlock() + } - bestChain := batch.bestChain + // Stage 1: fetch headers. + headersChan := make(chan *headersBatch) + g.Go(func() error { + var batch *headersBatch + var err error + for { + // If we have a previous batch, the next batch is + // likely to be a successor to it. + var likelyBestChain []*wallet.BlockNode + if batch != nil { + likelyBestChain = batch.bestChain + } - // Determine which nodes don't have cfilters yet. - s.sidechainMu.Lock() - var missingCfilter []*wallet.BlockNode - for i := range bestChain { - if bestChain[i].FilterV2 == nil { - missingCfilter = bestChain[i:] - break + // Fetch a batch of headers. + batch, err = s.getHeaders(ctx, likelyBestChain) + if err != nil { + return err } - } - s.sidechainMu.Unlock() - // Fetch Missing CFilters. - err = s.cfiltersV2FromNodes(ctx, missingCfilter) - if err != nil { - log.Debugf("Unable to fetch missing cfilters from %v: %v", - batch.rp, err) - continue nextbatch - } - if len(missingCfilter) > 0 { - log.Debugf("Fetched %d new cfilters(s) ending at height %d", - len(missingCfilter), missingCfilter[len(missingCfilter)-1].Header.Height) + // Before sending to the next stage, check if the + // last one has already been invalidated while we + // were waiting for a getHeaders response. + select { + case <-invalidateBatchChan: + batch = nil + refreshInvalidateBatchChan() + continue + default: + } + + // Otherwise, send to next stage. + select { + case headersChan <- batch: + case <-invalidateBatchChan: + batch = nil + refreshInvalidateBatchChan() + continue + case <-ctx.Done(): + return ctx.Err() + } + + if batch.done { + return nil + } } + }) - // Switch the best chain, now that all cfilters have been - // fetched for it. - s.sidechainMu.Lock() - prevChain, err := s.wallet.ChainSwitch(ctx, &s.sidechains, bestChain, nil) - if err != nil { + // Stage 2: fetch cfilters. + cfiltersChan := make(chan *headersBatch) + g.Go(func() error { + var batch *headersBatch + var err error + for { + // Wait for a batch of headers. + select { + case batch = <-headersChan: + case <-ctx.Done(): + return ctx.Err() + } + + // Once done, send the last batch forward and return. + if batch.done { + select { + case cfiltersChan <- batch: + case <-ctx.Done(): + return ctx.Err() + } + return nil + } + + // Determine which nodes don't have cfilters yet. + s.sidechainMu.Lock() + var missingCfilter []*wallet.BlockNode + for i := range batch.bestChain { + if batch.bestChain[i].FilterV2 == nil { + missingCfilter = batch.bestChain[i:] + break + } + } s.sidechainMu.Unlock() - batch.rp.Disconnect(err) - continue nextbatch - } - if len(prevChain) != 0 { - log.Infof("Reorganize from %v to %v (total %d block(s) reorged)", - prevChain[len(prevChain)-1].Hash, bestChain[len(bestChain)-1].Hash, len(prevChain)) - for _, n := range prevChain { - s.sidechains.AddBlockNode(n) + // Fetch Missing CFilters. + err = s.cfiltersV2FromNodes(ctx, missingCfilter) + if errors.Is(err, errCfilterWatchdogTriggered) { + log.Debugf("Unable to fetch missing cfilters from %v: %v", + batch.rp, err) + invalidateBatch() + continue + } + if err != nil { + return err + } + + if len(missingCfilter) > 0 { + lastHeight := missingCfilter[len(missingCfilter)-1].Header.Height + log.Debugf("Fetched %d new cfilters(s) ending at height %d", + len(missingCfilter), lastHeight) + } + + // Pass the batch to the next stage. + select { + case cfiltersChan <- batch: + case <-ctx.Done(): + return ctx.Err() } } - tip := bestChain[len(bestChain)-1] - if len(bestChain) == 1 { - log.Infof("Connected block %v, height %d", tip.Hash, tip.Header.Height) - } else { - log.Infof("Connected %d blocks, new tip %v, height %d, date %v", - len(bestChain), tip.Hash, tip.Header.Height, tip.Header.Timestamp) - } - s.fetchHeadersProgress(tip.Header) + }) - s.sidechainMu.Unlock() + // Stage 3: chain switch. + g.Go(func() error { + startTime := time.Now() + for ctx.Err() == nil { + // Fetch a batch with cfilters filled in. + var batch *headersBatch + select { + case batch = <-cfiltersChan: + case <-ctx.Done(): + return ctx.Err() + } - // Peers should not be significantly behind the new tip. - s.setRequiredHeight(int32(tip.Header.Height)) - s.disconnectStragglers(int32(tip.Header.Height)) - } + if batch.done { + // All done. + log.Debugf("Initial sync completed in %s", + time.Since(startTime).Round(time.Second)) + return nil + } + + // Switch the best chain, now that all cfilters have been + // fetched for it. + s.sidechainMu.Lock() + bestChain := batch.bestChain + + // When the first N blocks of bestChain have already been added + // to mainchain tip, they don't have to be added again. This + // happens when requesting batches ahead of time. + tipHash, tipHeight := s.wallet.MainChainTip(ctx) + tipIndex := tipHeight - int32(bestChain[0].Header.Height) + if tipIndex > 0 && tipIndex < int32(len(bestChain)-1) && *bestChain[tipIndex].Hash == tipHash { + log.Tracef("Updating bestChain to tipIndex %d from %d to %d", + tipIndex, bestChain[0].Header.Height, + bestChain[tipIndex+1].Header.Height) + bestChain = bestChain[tipIndex+1:] + } + + // Switch to the new main chain. + prevChain, err := s.wallet.ChainSwitch(ctx, &s.sidechains, bestChain, nil) + if err != nil { + s.sidechainMu.Unlock() + batch.rp.Disconnect(err) + invalidateBatch() + continue + } - return ctx.Err() + if len(prevChain) != 0 { + log.Infof("Reorganize from %v to %v (total %d block(s) reorged)", + prevChain[len(prevChain)-1].Hash, bestChain[len(bestChain)-1].Hash, len(prevChain)) + for _, n := range prevChain { + s.sidechains.AddBlockNode(n) + } + } + tip := bestChain[len(bestChain)-1] + if len(bestChain) == 1 { + log.Infof("Connected block %v, height %d", tip.Hash, tip.Header.Height) + } else { + log.Infof("Connected %d blocks, new tip %v, height %d, date %v", + len(bestChain), tip.Hash, tip.Header.Height, tip.Header.Timestamp) + } + s.fetchHeadersProgress(tip.Header) + + s.sidechainMu.Unlock() + + // Peers should not be significantly behind the new tip. + s.setRequiredHeight(int32(tip.Header.Height)) + s.disconnectStragglers(int32(tip.Header.Height)) + } + + return ctx.Err() + }) + + return g.Wait() } // initialSyncRescan performs account and address discovery and rescans blocks