From aa39d15cde8af892acfc6119112e02043bfb0e64 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Wed, 7 Aug 2024 16:27:07 +0900 Subject: [PATCH] netsync: add peerSubscription peerSubscription is added to Manager which will allow it subscribers to receive peers through the channel whenever the Manager is aware of a new peer that it's been connected to. This is useful to alert query.Workmanager that a new peer that's been connected to is eligible to download blocks from. --- go.mod | 1 + go.sum | 2 ++ netsync/manager.go | 41 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/go.mod b/go.mod index 1f445d9065..6e6a2cbfa4 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 // indirect + github.com/lightninglabs/neutrino v0.16.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.0 // indirect golang.org/x/net v0.24.0 // indirect diff --git a/go.sum b/go.sum index bb666c89de..3f3b346498 100644 --- a/go.sum +++ b/go.sum @@ -63,6 +63,8 @@ github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= +github.com/lightninglabs/neutrino v0.16.0 h1:YNTQG32fPR/Zg0vvJVI65OBH8l3U18LSXXtX91hx0q0= +github.com/lightninglabs/neutrino v0.16.0/go.mod h1:x3OmY2wsA18+Kc3TSV2QpSUewOCiscw2mKpXgZv2kZk= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= diff --git a/netsync/manager.go b/netsync/manager.go index 3215a86ace..130723b897 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -20,6 +20,7 @@ import ( "github.com/btcsuite/btcd/mempool" peerpkg "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/neutrino/query" ) const ( @@ -203,6 +204,7 @@ type SyncManager struct { headerList *list.List startHeader *list.Element nextCheckpoint *chaincfg.Checkpoint + peerSubscribers []*peerSubscription // An optional fee estimator. feeEstimator *mempool.FeeEstimator @@ -452,6 +454,31 @@ func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool { return true } +// notifyPeerSubscribers notifies all the current peer subscribers of the peer +// that was passed in. +func (sm *SyncManager) notifyPeerSubscribers(peer *peerpkg.Peer) { + // Loop for alerting subscribers to the new peer that was connected to. + n := 0 + for i, sub := range sm.peerSubscribers { + select { + // Quickly check whether this subscription has been canceled. + case <-sub.cancel: + // Avoid GC leak. + sm.peerSubscribers[i] = nil + continue + default: + } + + // Keep non-canceled subscribers around. + sm.peerSubscribers[n] = sub + n++ + + sub.peers <- peer + } + // Re-align the slice to only active subscribers. + sm.peerSubscribers = sm.peerSubscribers[:n] +} + // handleNewPeerMsg deals with new peers that have signalled they may // be considered as a sync peer (they have already successfully negotiated). It // also starts syncing if needed. It is invoked from the syncHandler goroutine. @@ -471,6 +498,13 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) { requestedBlocks: make(map[chainhash.Hash]struct{}), } + // Only pass the peer off to the subscribers if we're able to sync off of + // the peer. + bestHeight := sm.chain.BestSnapshot().Height + if isSyncCandidate && peer.LastBlock() > bestHeight { + sm.notifyPeerSubscribers(peer) + } + // Start syncing by choosing the best candidate if needed. if isSyncCandidate && sm.syncPeer == nil { sm.startSync() @@ -1666,6 +1700,13 @@ func (sm *SyncManager) Pause() chan<- struct{} { return c } +// peerSubscription holds a peer subscription which we'll notify about any +// connected peers. +type peerSubscription struct { + peers chan<- query.Peer + cancel <-chan struct{} +} + // New constructs a new SyncManager. Use Start to begin processing asynchronous // block, tx, and inv updates. func New(config *Config) (*SyncManager, error) {