Skip to content

Commit

Permalink
netsync: add peerSubscription
Browse files Browse the repository at this point in the history
peerSubscription is added to Manager which will allow it subscribers to
receive peers through the channel whenever the Manager is aware of a new
peer that it's been connected to.  This is useful to alert
query.Workmanager that a new peer that's been connected to is eligible
to download blocks from.
  • Loading branch information
kcalvinalvin committed Dec 26, 2024
1 parent 858cef3 commit aa39d15
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 // indirect
github.com/lightninglabs/neutrino v0.16.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/net v0.24.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/lightninglabs/neutrino v0.16.0 h1:YNTQG32fPR/Zg0vvJVI65OBH8l3U18LSXXtX91hx0q0=
github.com/lightninglabs/neutrino v0.16.0/go.mod h1:x3OmY2wsA18+Kc3TSV2QpSUewOCiscw2mKpXgZv2kZk=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down
41 changes: 41 additions & 0 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/btcsuite/btcd/mempool"
peerpkg "github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/query"
)

const (
Expand Down Expand Up @@ -203,6 +204,7 @@ type SyncManager struct {
headerList *list.List
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint
peerSubscribers []*peerSubscription

// An optional fee estimator.
feeEstimator *mempool.FeeEstimator
Expand Down Expand Up @@ -452,6 +454,31 @@ func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool {
return true
}

// notifyPeerSubscribers notifies all the current peer subscribers of the peer
// that was passed in.
func (sm *SyncManager) notifyPeerSubscribers(peer *peerpkg.Peer) {
// Loop for alerting subscribers to the new peer that was connected to.
n := 0
for i, sub := range sm.peerSubscribers {
select {
// Quickly check whether this subscription has been canceled.
case <-sub.cancel:
// Avoid GC leak.
sm.peerSubscribers[i] = nil
continue
default:
}

// Keep non-canceled subscribers around.
sm.peerSubscribers[n] = sub
n++

sub.peers <- peer
}
// Re-align the slice to only active subscribers.
sm.peerSubscribers = sm.peerSubscribers[:n]
}

// handleNewPeerMsg deals with new peers that have signalled they may
// be considered as a sync peer (they have already successfully negotiated). It
// also starts syncing if needed. It is invoked from the syncHandler goroutine.
Expand All @@ -471,6 +498,13 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
requestedBlocks: make(map[chainhash.Hash]struct{}),
}

// Only pass the peer off to the subscribers if we're able to sync off of
// the peer.
bestHeight := sm.chain.BestSnapshot().Height
if isSyncCandidate && peer.LastBlock() > bestHeight {
sm.notifyPeerSubscribers(peer)
}

// Start syncing by choosing the best candidate if needed.
if isSyncCandidate && sm.syncPeer == nil {
sm.startSync()
Expand Down Expand Up @@ -1666,6 +1700,13 @@ func (sm *SyncManager) Pause() chan<- struct{} {
return c
}

// peerSubscription holds a peer subscription which we'll notify about any
// connected peers.
type peerSubscription struct {
peers chan<- query.Peer
cancel <-chan struct{}
}

// New constructs a new SyncManager. Use Start to begin processing asynchronous
// block, tx, and inv updates.
func New(config *Config) (*SyncManager, error) {
Expand Down

0 comments on commit aa39d15

Please sign in to comment.