Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add active peer probing and a cached addr book #90

Merged
merged 80 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
3896470
feat: add cached peer book with higher ttls
2color Nov 27, 2024
7dd33ca
feat: initial implementation of active peer probing
2color Nov 27, 2024
0e86ea4
feat: use the cached router
2color Nov 27, 2024
ec2a67a
chore: go mod tidy
2color Nov 27, 2024
fe68140
feat: log probe duration
2color Nov 27, 2024
06c2d0c
chore: log in probe loop
2color Nov 27, 2024
fc76783
fix: update peer state if doesn't exist
2color Nov 27, 2024
e904c3e
fix: add addresses to cached address book
2color Nov 27, 2024
814ae58
fix: wrap with cached router only if available
2color Nov 27, 2024
a4d6456
feat: make everything a little bit better
2color Nov 27, 2024
81feca7
chore: small refinements
2color Nov 28, 2024
e75992f
test: add test for cached addr book
2color Nov 28, 2024
a20a4c3
chore: rename files
2color Nov 28, 2024
c5f1d62
feat: add options to cached addr book
2color Nov 28, 2024
e678be8
feat: add instrumentation
2color Nov 28, 2024
a0965bc
fix: thread safety
2color Nov 28, 2024
d82ad0f
docs: update changelog
2color Nov 28, 2024
a84d5f6
fix: small fixes
2color Nov 28, 2024
9ab02e1
fix: simplify cached router
2color Nov 28, 2024
9658af8
feat(metric): cached_router_peer_addr_lookups
lidel Nov 28, 2024
7cdb5be
Apply suggestions from code review
2color Nov 29, 2024
762136e
Update CHANGELOG.md
2color Nov 29, 2024
2cf46d4
chore: use service name for namespace
2color Nov 29, 2024
a0d5c62
fix: type errors and missing imports
2color Nov 29, 2024
75f1bf2
feat: add queue probe
2color Nov 29, 2024
4cbaa91
Revert "feat: add queue probe"
2color Nov 29, 2024
d038301
chore: simplify composite literal
2color Dec 2, 2024
796e94f
fix: implement custom cache fallback iterator
2color Dec 3, 2024
2e4d12c
fix: add cancel and simplify
2color Dec 3, 2024
811dce8
fix: move select to Val function
2color Dec 3, 2024
b4da9cd
fix: concurrency bug from the ongoingLookups
2color Dec 4, 2024
d00fcb4
chore: clean up comments
2color Dec 4, 2024
6219804
fix: add lint ignores
2color Dec 4, 2024
662f0d4
docs: update changelog
2color Dec 4, 2024
c812cf4
fix: increase bucket sizes for probe duration
2color Dec 4, 2024
8646f38
chore: remove unused peer state fields
2color Dec 4, 2024
46a74a3
feat: enable caching for FindPeer in cached router
2color Dec 4, 2024
d9601e4
fix: handle peer not found case
2color Dec 4, 2024
986b010
Apply suggestions from code review
2color Dec 5, 2024
ecd0757
fix: wait longer during cleanup function
2color Dec 5, 2024
a0443d0
test: remove bitswap record test
2color Dec 5, 2024
22aacd7
refactor: extract connectedness checks to a func
2color Dec 5, 2024
fe372ac
fix: set ttl for both signed and unsigned addrs
2color Dec 5, 2024
03a4078
fix: prevent race condition
2color Dec 5, 2024
84393fd
feat: use 2q-lru cache for peer state
2color Dec 5, 2024
d466dc7
chore: remove return count
2color Dec 5, 2024
8078cb5
test: improve reliability of tests
2color Dec 5, 2024
7decf6c
fix: record failed connections
2color Dec 5, 2024
b536e82
feat: add exponential backoff for probes/peer lookups
2color Dec 5, 2024
7182699
fix: return peers with no addrs that wont probe
2color Dec 5, 2024
b0b24e0
fix: brittle test
2color Dec 6, 2024
697457d
feat: add probed peers counter
2color Dec 6, 2024
7fcf45f
fix: adjust probe duration metric buckets
2color Dec 6, 2024
1718215
fix: prevent race conditions
2color Dec 6, 2024
dc57e9f
feat: increase cache size and add max backoff
2color Dec 6, 2024
c5abeec
fix: omit providers whose peer cannot be found
2color Dec 9, 2024
0cc76f9
chore: remove unused function
2color Dec 10, 2024
f0e0bd4
deps: upgrade go-libp2p
2color Dec 10, 2024
2211aae
fix: avoid using the cache in FindPeers
2color Dec 10, 2024
be5958a
fix: do not return cached results for FindPeers
2color Dec 11, 2024
af7c3a8
refactor: small optimisation
2color Dec 11, 2024
62c0d9f
chore: re-add comment
2color Dec 11, 2024
8b36b0c
Apply suggestions from code review
2color Dec 16, 2024
b58b50d
Apply suggestions from code review
2color Dec 16, 2024
41922af
fix: use separate context for dispatched jobs
2color Dec 16, 2024
06cef21
fix: ensure proper cleanup of cache fallback iter
2color Dec 16, 2024
7a2160a
Update main.go
2color Dec 16, 2024
84bc4f7
fix: formatting
2color Dec 16, 2024
0c28c6b
fix: let consumer handle cleanup
2color Dec 16, 2024
e0a601f
fix: remove from address book when removed from peer state
2color Dec 17, 2024
7f0ec50
fix: use normal lru cache instead of 2Q
2color Dec 17, 2024
2e025eb
fix: update the metric when removing from the peer cache
2color Dec 17, 2024
6b4b40d
fix: increase max backoff to 48 hours
2color Dec 17, 2024
fe7ad54
feat: add env var for recently connected ttl
2color Dec 17, 2024
49efe9b
feat: add env var to control active probing
2color Dec 17, 2024
8ca4d19
fix: bug from closing the iterator twice
2color Dec 17, 2024
317ccb7
docs: update comment
2color Dec 17, 2024
327f9cb
docs: improve changelog
2color Dec 17, 2024
48e1943
test: fix background test
2color Dec 17, 2024
c1ac41b
feat(metrics): track online vs offline probe ratio
lidel Dec 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ The following emojis are used to highlight certain changes:

### Added

2color marked this conversation as resolved.
Show resolved Hide resolved
- Peer addresses are cached for 48h to match [provider record expiration on Amino DHT](https://github.com/libp2p/go-libp2p-kad-dht/blob/v0.28.1/amino/defaults.go#L40-L43).
- In the background, someguy probes cached peers at most once per hour (`PeerProbeThreshold`) by attempting to dial them to keep their multiaddrs up to date. If a peer is not reachable, an exponential backoff is applied to reduce the frequency of probing. If a cached peer is unreachable for more than 48h (`MaxBackoffDuration`), it is removed from the cache.
- Someguy now augments providers missing addresses in `FindProviders` with cached addresses. If a peer is encountered with no cached addresses, `FindPeer` is dispatched in the background and the result is streamed in the reponse. Providers for which no addresses can be found, are omitted from the response.
- This can be enabled via `SOMEGUY_CACHED_ADDR_BOOK=true|false` (enabled by default)
- Two additional configuration options for the `cachedAddrBook` implementation:
- `SOMEGUY_CACHED_ADDR_BOOK_ACTIVE_PROBING` whether to actively probe cached peers in the background to keep their multiaddrs up to date.
- `SOMEGUY_CACHED_ADDR_BOOK_RECENT_TTL` to adjust the TTL for cached addresses of recently connected peers.

### Changed

### Removed
Expand Down
354 changes: 354 additions & 0 deletions cached_addr_book.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,354 @@
package main

import (
"context"
"io"
"sync"
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/ipfs/boxo/routing/http/types"
"github.com/libp2p/go-libp2p-kad-dht/amino"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
Subsystem = "cached_addr_book"

// The default TTL to keep recently connected peers' multiaddrs for
DefaultRecentlyConnectedAddrTTL = amino.DefaultProvideValidity

// Connected peers don't expire until they disconnect
ConnectedAddrTTL = peerstore.ConnectedAddrTTL

// How long to wait since last connection before probing a peer again
PeerProbeThreshold = time.Hour
lidel marked this conversation as resolved.
Show resolved Hide resolved

// How often to run the probe peers loop
ProbeInterval = time.Minute * 15

// How many concurrent probes to run at once
MaxConcurrentProbes = 20

// How long to wait for a connect in a probe to complete.
// The worst case is a peer behind a relay, so we use the relay connect timeout.
ConnectTimeout = relay.ConnectTimeout

// How many peers to cache in the peer state cache
// 1_000_000 is 10x the default number of signed peer records cached by the memory address book.
PeerCacheSize = 1_000_000

// Maximum backoff duration for probing a peer. After this duration, we will stop
// trying to connect to the peer and remove it from the cache.
MaxBackoffDuration = amino.DefaultProvideValidity

probeResult = "result"
probeResultOnline = "online"
probeResultOffline = "offline"
)

var (
probeDurationHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "probe_duration_seconds",
Namespace: name,
Subsystem: Subsystem,
Help: "Duration of peer probing operations in seconds",
// Buckets probe durations from 5s to 15 minutes
Buckets: []float64{5, 10, 30, 60, 120, 300, 600, 900},
})

probedPeersCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "probed_peers",
Subsystem: Subsystem,
Namespace: name,
Help: "Number of peers probed",
},
[]string{probeResult},
)

peerStateSize = promauto.NewGauge(prometheus.GaugeOpts{
Name: "peer_state_size",
Subsystem: Subsystem,
Namespace: name,
Help: "Number of peers object currently in the peer state",
})
)

type peerState struct {
lastConnTime time.Time // last time we successfully connected to this peer
lastFailedConnTime time.Time // last time we failed to find or connect to this peer
connectFailures uint // number of times we've failed to connect to this peer
}

type cachedAddrBook struct {
addrBook peerstore.AddrBook // memory address book
peerCache *lru.Cache[peer.ID, peerState] // LRU cache with additional metadata about peer
probingEnabled bool
isProbing atomic.Bool
allowPrivateIPs bool // for testing
recentlyConnectedTTL time.Duration
}

type AddrBookOption func(*cachedAddrBook) error

func WithAllowPrivateIPs() AddrBookOption {
return func(cab *cachedAddrBook) error {
cab.allowPrivateIPs = true
return nil
}
}

func WithRecentlyConnectedTTL(ttl time.Duration) AddrBookOption {
return func(cab *cachedAddrBook) error {
cab.recentlyConnectedTTL = ttl
return nil
}

Check warning on line 116 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L112-L116

Added lines #L112 - L116 were not covered by tests
}

func WithActiveProbing(enabled bool) AddrBookOption {
return func(cab *cachedAddrBook) error {
cab.probingEnabled = enabled
return nil
}

Check warning on line 123 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L119-L123

Added lines #L119 - L123 were not covered by tests
}

func newCachedAddrBook(opts ...AddrBookOption) (*cachedAddrBook, error) {
peerCache, err := lru.New[peer.ID, peerState](PeerCacheSize)
if err != nil {
return nil, err
}

Check warning on line 130 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L129-L130

Added lines #L129 - L130 were not covered by tests

cab := &cachedAddrBook{
peerCache: peerCache,
addrBook: pstoremem.NewAddrBook(),
recentlyConnectedTTL: DefaultRecentlyConnectedAddrTTL, // Set default value
}

for _, opt := range opts {
err := opt(cab)
if err != nil {
return nil, err
}

Check warning on line 142 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L141-L142

Added lines #L141 - L142 were not covered by tests
}
logger.Infof("Using TTL of %s for recently connected peers", cab.recentlyConnectedTTL)
logger.Infof("Probing enabled: %t", cab.probingEnabled)
return cab, nil
}

func (cab *cachedAddrBook) background(ctx context.Context, host host.Host) {
sub, err := host.EventBus().Subscribe([]interface{}{
&event.EvtPeerIdentificationCompleted{},
&event.EvtPeerConnectednessChanged{},
})
if err != nil {
logger.Errorf("failed to subscribe to peer identification events: %v", err)
return
}

Check warning on line 157 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L155-L157

Added lines #L155 - L157 were not covered by tests
defer sub.Close()

probeTicker := time.NewTicker(ProbeInterval)
defer probeTicker.Stop()

for {
select {
case <-ctx.Done():
cabCloser, ok := cab.addrBook.(io.Closer)
if ok {
errClose := cabCloser.Close()
if errClose != nil {
logger.Warnf("failed to close addr book: %v", errClose)
}

Check warning on line 171 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L170-L171

Added lines #L170 - L171 were not covered by tests
}
return
case ev := <-sub.Out():
switch ev := ev.(type) {
case event.EvtPeerIdentificationCompleted:
pState, exists := cab.peerCache.Peek(ev.Peer)
if !exists {
pState = peerState{}
}
pState.lastConnTime = time.Now()
pState.lastFailedConnTime = time.Time{} // reset failed connection time
pState.connectFailures = 0 // reset connect failures on successful connection
cab.peerCache.Add(ev.Peer, pState)
peerStateSize.Set(float64(cab.peerCache.Len())) // update metric

ttl := cab.getTTL(host.Network().Connectedness(ev.Peer))
if ev.SignedPeerRecord != nil {
logger.Debug("Caching signed peer record")
cab, ok := peerstore.GetCertifiedAddrBook(cab.addrBook)
if ok {
_, err := cab.ConsumePeerRecord(ev.SignedPeerRecord, ttl)
if err != nil {
logger.Warnf("failed to consume signed peer record: %v", err)
}

Check warning on line 195 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L189-L195

Added lines #L189 - L195 were not covered by tests
}
} else {
logger.Debug("No signed peer record, caching listen addresses")
// We don't have a signed peer record, so we use the listen addresses
cab.addrBook.AddAddrs(ev.Peer, ev.ListenAddrs, ttl)
}
case event.EvtPeerConnectednessChanged:
// If the peer is not connected or limited, we update the TTL
if !hasValidConnectedness(ev.Connectedness) {
cab.addrBook.UpdateAddrs(ev.Peer, ConnectedAddrTTL, cab.recentlyConnectedTTL)
}

Check warning on line 206 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L202-L206

Added lines #L202 - L206 were not covered by tests
}
case <-probeTicker.C:
if !cab.probingEnabled {
logger.Debug("Probing disabled, skipping")
continue

Check warning on line 211 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L208-L211

Added lines #L208 - L211 were not covered by tests
}
if cab.isProbing.Load() {
logger.Debug("Skipping peer probe, still running")
continue

Check warning on line 215 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L213-L215

Added lines #L213 - L215 were not covered by tests
}
logger.Debug("Starting to probe peers")
cab.isProbing.Store(true)
go cab.probePeers(ctx, host)

Check warning on line 219 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L217-L219

Added lines #L217 - L219 were not covered by tests
}
}
}

// Loops over all peers with addresses and probes them if they haven't been probed recently
func (cab *cachedAddrBook) probePeers(ctx context.Context, host host.Host) {
defer cab.isProbing.Store(false)

start := time.Now()
defer func() {
duration := time.Since(start).Seconds()
probeDurationHistogram.Observe(duration)
logger.Debugf("Finished probing peers in %s", duration)
}()

var wg sync.WaitGroup
// semaphore channel to limit the number of concurrent probes
semaphore := make(chan struct{}, MaxConcurrentProbes)

for i, p := range cab.addrBook.PeersWithAddrs() {
if hasValidConnectedness(host.Network().Connectedness(p)) {
continue // don't probe connected peers

Check warning on line 241 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L241

Added line #L241 was not covered by tests
}

if !cab.ShouldProbePeer(p) {
continue

Check warning on line 245 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L245

Added line #L245 was not covered by tests
}

addrs := cab.addrBook.Addrs(p)

if !cab.allowPrivateIPs {
addrs = ma.FilterAddrs(addrs, manet.IsPublicAddr)
}

Check warning on line 252 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L251-L252

Added lines #L251 - L252 were not covered by tests

if len(addrs) == 0 {
continue // no addresses to probe

Check warning on line 255 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L255

Added line #L255 was not covered by tests
}

wg.Add(1)
semaphore <- struct{}{}
go func() {
defer func() {
<-semaphore // Release semaphore
wg.Done()
}()
ctx, cancel := context.WithTimeout(ctx, ConnectTimeout)
defer cancel()
logger.Debugf("Probe %d: PeerID: %s, Addrs: %v", i+1, p, addrs)
// if connect succeeds and identify runs, the background loop will take care of updating the peer state and cache
err := host.Connect(ctx, peer.AddrInfo{
ID: p,
Addrs: addrs,
})
if err != nil {
logger.Debugf("failed to connect to peer %s: %v", p, err)
cab.RecordFailedConnection(p)
probedPeersCounter.WithLabelValues(probeResultOffline).Inc()
} else {
probedPeersCounter.WithLabelValues(probeResultOnline).Inc()
}

Check warning on line 279 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L278-L279

Added lines #L278 - L279 were not covered by tests
}()
}
wg.Wait()
}

// Returns the cached addresses for a peer, incrementing the return count
func (cab *cachedAddrBook) GetCachedAddrs(p peer.ID) []types.Multiaddr {
cachedAddrs := cab.addrBook.Addrs(p)

if len(cachedAddrs) == 0 {
return nil
}

result := make([]types.Multiaddr, 0, len(cachedAddrs)) // convert to local Multiaddr type 🙃
for _, addr := range cachedAddrs {
result = append(result, types.Multiaddr{Multiaddr: addr})
}
return result
}

// Update the peer cache with information about a failed connection
// This should be called when a connection attempt to a peer fails
func (cab *cachedAddrBook) RecordFailedConnection(p peer.ID) {
pState, exists := cab.peerCache.Peek(p)
if !exists {
pState = peerState{}
}
now := time.Now()
// once probing of offline peer reached MaxBackoffDuration and still failed,
// we opportunistically remove the dead peer from cache to save time on probing it further
if exists && pState.connectFailures > 1 && now.Sub(pState.lastFailedConnTime) > MaxBackoffDuration {
cab.peerCache.Remove(p)
peerStateSize.Set(float64(cab.peerCache.Len())) // update metric
// remove the peer from the addr book. Otherwise it will be probed again in the probe loop
cab.addrBook.ClearAddrs(p)
return
}

Check warning on line 316 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L311-L316

Added lines #L311 - L316 were not covered by tests
pState.lastFailedConnTime = now
pState.connectFailures++
cab.peerCache.Add(p, pState)
}

// Returns true if we should probe a peer (either by dialing known addresses or by dispatching a FindPeer)
// based on the last failed connection time and connection failures
func (cab *cachedAddrBook) ShouldProbePeer(p peer.ID) bool {
pState, exists := cab.peerCache.Peek(p)
if !exists {
return true // default to probing if the peer is not in the cache
}

var backoffDuration time.Duration
if pState.connectFailures > 0 {
// Calculate backoff only if we have failures
// this is effectively 2^(connectFailures - 1) * PeerProbeThreshold
// A single failure results in a 1 hour backoff and each additional failure doubles the backoff
backoffDuration = PeerProbeThreshold * time.Duration(1<<(pState.connectFailures-1))
backoffDuration = min(backoffDuration, MaxBackoffDuration) // clamp to max backoff duration
} else {
backoffDuration = PeerProbeThreshold
}

// Only dispatch if we've waited long enough based on the backoff
return time.Since(pState.lastFailedConnTime) > backoffDuration
}

func hasValidConnectedness(connectedness network.Connectedness) bool {
return connectedness == network.Connected || connectedness == network.Limited
}

func (cab *cachedAddrBook) getTTL(connectedness network.Connectedness) time.Duration {
if hasValidConnectedness(connectedness) {
return ConnectedAddrTTL
}

Check warning on line 352 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L351-L352

Added lines #L351 - L352 were not covered by tests
return cab.recentlyConnectedTTL
}
Loading
Loading