diff --git a/CHANGELOG.md b/CHANGELOG.md index 58d1c11..dbb763a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,14 @@ The following emojis are used to highlight certain changes: ### Added +- Peer addresses are cached for 48h to match [provider record expiration on Amino DHT](https://github.com/libp2p/go-libp2p-kad-dht/blob/v0.28.1/amino/defaults.go#L40-L43). +- In the background, someguy probes cached peers at most once per hour (`PeerProbeThreshold`) by attempting to dial them to keep their multiaddrs up to date. If a peer is not reachable, an exponential backoff is applied to reduce the frequency of probing. If a cached peer is unreachable for more than 48h (`MaxBackoffDuration`), it is removed from the cache. +- Someguy now augments providers missing addresses in `FindProviders` with cached addresses. If a peer is encountered with no cached addresses, `FindPeer` is dispatched in the background and the result is streamed in the reponse. Providers for which no addresses can be found, are omitted from the response. + - This can be enabled via `SOMEGUY_CACHED_ADDR_BOOK=true|false` (enabled by default) + - Two additional configuration options for the `cachedAddrBook` implementation: + - `SOMEGUY_CACHED_ADDR_BOOK_ACTIVE_PROBING` whether to actively probe cached peers in the background to keep their multiaddrs up to date. + - `SOMEGUY_CACHED_ADDR_BOOK_RECENT_TTL` to adjust the TTL for cached addresses of recently connected peers. + ### Changed ### Removed diff --git a/cached_addr_book.go b/cached_addr_book.go new file mode 100644 index 0000000..75171a2 --- /dev/null +++ b/cached_addr_book.go @@ -0,0 +1,354 @@ +package main + +import ( + "context" + "io" + "sync" + "sync/atomic" + "time" + + lru "github.com/hashicorp/golang-lru/v2" + "github.com/ipfs/boxo/routing/http/types" + "github.com/libp2p/go-libp2p-kad-dht/amino" + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" + "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + Subsystem = "cached_addr_book" + + // The default TTL to keep recently connected peers' multiaddrs for + DefaultRecentlyConnectedAddrTTL = amino.DefaultProvideValidity + + // Connected peers don't expire until they disconnect + ConnectedAddrTTL = peerstore.ConnectedAddrTTL + + // How long to wait since last connection before probing a peer again + PeerProbeThreshold = time.Hour + + // How often to run the probe peers loop + ProbeInterval = time.Minute * 15 + + // How many concurrent probes to run at once + MaxConcurrentProbes = 20 + + // How long to wait for a connect in a probe to complete. + // The worst case is a peer behind a relay, so we use the relay connect timeout. + ConnectTimeout = relay.ConnectTimeout + + // How many peers to cache in the peer state cache + // 1_000_000 is 10x the default number of signed peer records cached by the memory address book. + PeerCacheSize = 1_000_000 + + // Maximum backoff duration for probing a peer. After this duration, we will stop + // trying to connect to the peer and remove it from the cache. + MaxBackoffDuration = amino.DefaultProvideValidity + + probeResult = "result" + probeResultOnline = "online" + probeResultOffline = "offline" +) + +var ( + probeDurationHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "probe_duration_seconds", + Namespace: name, + Subsystem: Subsystem, + Help: "Duration of peer probing operations in seconds", + // Buckets probe durations from 5s to 15 minutes + Buckets: []float64{5, 10, 30, 60, 120, 300, 600, 900}, + }) + + probedPeersCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "probed_peers", + Subsystem: Subsystem, + Namespace: name, + Help: "Number of peers probed", + }, + []string{probeResult}, + ) + + peerStateSize = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "peer_state_size", + Subsystem: Subsystem, + Namespace: name, + Help: "Number of peers object currently in the peer state", + }) +) + +type peerState struct { + lastConnTime time.Time // last time we successfully connected to this peer + lastFailedConnTime time.Time // last time we failed to find or connect to this peer + connectFailures uint // number of times we've failed to connect to this peer +} + +type cachedAddrBook struct { + addrBook peerstore.AddrBook // memory address book + peerCache *lru.Cache[peer.ID, peerState] // LRU cache with additional metadata about peer + probingEnabled bool + isProbing atomic.Bool + allowPrivateIPs bool // for testing + recentlyConnectedTTL time.Duration +} + +type AddrBookOption func(*cachedAddrBook) error + +func WithAllowPrivateIPs() AddrBookOption { + return func(cab *cachedAddrBook) error { + cab.allowPrivateIPs = true + return nil + } +} + +func WithRecentlyConnectedTTL(ttl time.Duration) AddrBookOption { + return func(cab *cachedAddrBook) error { + cab.recentlyConnectedTTL = ttl + return nil + } +} + +func WithActiveProbing(enabled bool) AddrBookOption { + return func(cab *cachedAddrBook) error { + cab.probingEnabled = enabled + return nil + } +} + +func newCachedAddrBook(opts ...AddrBookOption) (*cachedAddrBook, error) { + peerCache, err := lru.New[peer.ID, peerState](PeerCacheSize) + if err != nil { + return nil, err + } + + cab := &cachedAddrBook{ + peerCache: peerCache, + addrBook: pstoremem.NewAddrBook(), + recentlyConnectedTTL: DefaultRecentlyConnectedAddrTTL, // Set default value + } + + for _, opt := range opts { + err := opt(cab) + if err != nil { + return nil, err + } + } + logger.Infof("Using TTL of %s for recently connected peers", cab.recentlyConnectedTTL) + logger.Infof("Probing enabled: %t", cab.probingEnabled) + return cab, nil +} + +func (cab *cachedAddrBook) background(ctx context.Context, host host.Host) { + sub, err := host.EventBus().Subscribe([]interface{}{ + &event.EvtPeerIdentificationCompleted{}, + &event.EvtPeerConnectednessChanged{}, + }) + if err != nil { + logger.Errorf("failed to subscribe to peer identification events: %v", err) + return + } + defer sub.Close() + + probeTicker := time.NewTicker(ProbeInterval) + defer probeTicker.Stop() + + for { + select { + case <-ctx.Done(): + cabCloser, ok := cab.addrBook.(io.Closer) + if ok { + errClose := cabCloser.Close() + if errClose != nil { + logger.Warnf("failed to close addr book: %v", errClose) + } + } + return + case ev := <-sub.Out(): + switch ev := ev.(type) { + case event.EvtPeerIdentificationCompleted: + pState, exists := cab.peerCache.Peek(ev.Peer) + if !exists { + pState = peerState{} + } + pState.lastConnTime = time.Now() + pState.lastFailedConnTime = time.Time{} // reset failed connection time + pState.connectFailures = 0 // reset connect failures on successful connection + cab.peerCache.Add(ev.Peer, pState) + peerStateSize.Set(float64(cab.peerCache.Len())) // update metric + + ttl := cab.getTTL(host.Network().Connectedness(ev.Peer)) + if ev.SignedPeerRecord != nil { + logger.Debug("Caching signed peer record") + cab, ok := peerstore.GetCertifiedAddrBook(cab.addrBook) + if ok { + _, err := cab.ConsumePeerRecord(ev.SignedPeerRecord, ttl) + if err != nil { + logger.Warnf("failed to consume signed peer record: %v", err) + } + } + } else { + logger.Debug("No signed peer record, caching listen addresses") + // We don't have a signed peer record, so we use the listen addresses + cab.addrBook.AddAddrs(ev.Peer, ev.ListenAddrs, ttl) + } + case event.EvtPeerConnectednessChanged: + // If the peer is not connected or limited, we update the TTL + if !hasValidConnectedness(ev.Connectedness) { + cab.addrBook.UpdateAddrs(ev.Peer, ConnectedAddrTTL, cab.recentlyConnectedTTL) + } + } + case <-probeTicker.C: + if !cab.probingEnabled { + logger.Debug("Probing disabled, skipping") + continue + } + if cab.isProbing.Load() { + logger.Debug("Skipping peer probe, still running") + continue + } + logger.Debug("Starting to probe peers") + cab.isProbing.Store(true) + go cab.probePeers(ctx, host) + } + } +} + +// Loops over all peers with addresses and probes them if they haven't been probed recently +func (cab *cachedAddrBook) probePeers(ctx context.Context, host host.Host) { + defer cab.isProbing.Store(false) + + start := time.Now() + defer func() { + duration := time.Since(start).Seconds() + probeDurationHistogram.Observe(duration) + logger.Debugf("Finished probing peers in %s", duration) + }() + + var wg sync.WaitGroup + // semaphore channel to limit the number of concurrent probes + semaphore := make(chan struct{}, MaxConcurrentProbes) + + for i, p := range cab.addrBook.PeersWithAddrs() { + if hasValidConnectedness(host.Network().Connectedness(p)) { + continue // don't probe connected peers + } + + if !cab.ShouldProbePeer(p) { + continue + } + + addrs := cab.addrBook.Addrs(p) + + if !cab.allowPrivateIPs { + addrs = ma.FilterAddrs(addrs, manet.IsPublicAddr) + } + + if len(addrs) == 0 { + continue // no addresses to probe + } + + wg.Add(1) + semaphore <- struct{}{} + go func() { + defer func() { + <-semaphore // Release semaphore + wg.Done() + }() + ctx, cancel := context.WithTimeout(ctx, ConnectTimeout) + defer cancel() + logger.Debugf("Probe %d: PeerID: %s, Addrs: %v", i+1, p, addrs) + // if connect succeeds and identify runs, the background loop will take care of updating the peer state and cache + err := host.Connect(ctx, peer.AddrInfo{ + ID: p, + Addrs: addrs, + }) + if err != nil { + logger.Debugf("failed to connect to peer %s: %v", p, err) + cab.RecordFailedConnection(p) + probedPeersCounter.WithLabelValues(probeResultOffline).Inc() + } else { + probedPeersCounter.WithLabelValues(probeResultOnline).Inc() + } + }() + } + wg.Wait() +} + +// Returns the cached addresses for a peer, incrementing the return count +func (cab *cachedAddrBook) GetCachedAddrs(p peer.ID) []types.Multiaddr { + cachedAddrs := cab.addrBook.Addrs(p) + + if len(cachedAddrs) == 0 { + return nil + } + + result := make([]types.Multiaddr, 0, len(cachedAddrs)) // convert to local Multiaddr type 🙃 + for _, addr := range cachedAddrs { + result = append(result, types.Multiaddr{Multiaddr: addr}) + } + return result +} + +// Update the peer cache with information about a failed connection +// This should be called when a connection attempt to a peer fails +func (cab *cachedAddrBook) RecordFailedConnection(p peer.ID) { + pState, exists := cab.peerCache.Peek(p) + if !exists { + pState = peerState{} + } + now := time.Now() + // once probing of offline peer reached MaxBackoffDuration and still failed, + // we opportunistically remove the dead peer from cache to save time on probing it further + if exists && pState.connectFailures > 1 && now.Sub(pState.lastFailedConnTime) > MaxBackoffDuration { + cab.peerCache.Remove(p) + peerStateSize.Set(float64(cab.peerCache.Len())) // update metric + // remove the peer from the addr book. Otherwise it will be probed again in the probe loop + cab.addrBook.ClearAddrs(p) + return + } + pState.lastFailedConnTime = now + pState.connectFailures++ + cab.peerCache.Add(p, pState) +} + +// Returns true if we should probe a peer (either by dialing known addresses or by dispatching a FindPeer) +// based on the last failed connection time and connection failures +func (cab *cachedAddrBook) ShouldProbePeer(p peer.ID) bool { + pState, exists := cab.peerCache.Peek(p) + if !exists { + return true // default to probing if the peer is not in the cache + } + + var backoffDuration time.Duration + if pState.connectFailures > 0 { + // Calculate backoff only if we have failures + // this is effectively 2^(connectFailures - 1) * PeerProbeThreshold + // A single failure results in a 1 hour backoff and each additional failure doubles the backoff + backoffDuration = PeerProbeThreshold * time.Duration(1<<(pState.connectFailures-1)) + backoffDuration = min(backoffDuration, MaxBackoffDuration) // clamp to max backoff duration + } else { + backoffDuration = PeerProbeThreshold + } + + // Only dispatch if we've waited long enough based on the backoff + return time.Since(pState.lastFailedConnTime) > backoffDuration +} + +func hasValidConnectedness(connectedness network.Connectedness) bool { + return connectedness == network.Connected || connectedness == network.Limited +} + +func (cab *cachedAddrBook) getTTL(connectedness network.Connectedness) time.Duration { + if hasValidConnectedness(connectedness) { + return ConnectedAddrTTL + } + return cab.recentlyConnectedTTL +} diff --git a/cached_addr_book_test.go b/cached_addr_book_test.go new file mode 100644 index 0000000..a20dae1 --- /dev/null +++ b/cached_addr_book_test.go @@ -0,0 +1,235 @@ +package main + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCachedAddrBook(t *testing.T) { + // Create a new cached address book + cab, err := newCachedAddrBook(WithAllowPrivateIPs()) + require.NoError(t, err) + require.NotNil(t, cab) + require.NotNil(t, cab.peerCache) + require.NotNil(t, cab.addrBook) +} + +func TestBackground(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a real event bus + eventBus := eventbus.NewBus() + + emitter, err := eventBus.Emitter(new(event.EvtPeerIdentificationCompleted), eventbus.Stateful) + require.NoError(t, err) + + // Use a mock host with a real event bus + mockHost := &mockHost{ + eventBus: eventBus, + } + + cab, err := newCachedAddrBook(WithAllowPrivateIPs()) + require.NoError(t, err) + + ctx, cancel = context.WithTimeout(ctx, time.Second*5) + defer cancel() + go cab.background(ctx, mockHost) + + // Create a test peer + testPeer, err := peer.Decode("12D3KooWCZ67sU8oCvKd82Y6c9NgpqgoZYuZEUcg4upHCjK3n1aj") + require.NoError(t, err) + + // Create test address + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234") + require.NoError(t, err) + + // Emit a real peer identification event + err = emitter.Emit(event.EvtPeerIdentificationCompleted{ + Peer: testPeer, + Conn: &mockConnection{ + remoteAddr: addr, + }, + ListenAddrs: []ma.Multiaddr{addr}, + }) + require.NoError(t, err) + + // Wait for the peer to be added to the cache + require.Eventually(t, func() bool { + _, exists := cab.peerCache.Get(testPeer) + return exists + }, time.Second*3, time.Millisecond*100, "peer was not added to cache") + + // Verify peer state + pState, exists := cab.peerCache.Get(testPeer) + assert.True(t, exists) + assert.NotNil(t, pState) +} + +func TestProbePeers(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a test libp2p host + mockHost := &mockHost{} + + cab, err := newCachedAddrBook(WithAllowPrivateIPs()) + require.NoError(t, err) + + // Add a test peer with some addresses + testPeer, _ := peer.Decode("12D3KooWCZ67sU8oCvKd82Y6c9NgpqgoZYuZEUcg4upHCjK3n1aj") + addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234") + cab.addrBook.AddAddrs(testPeer, []ma.Multiaddr{addr}, time.Hour) + + // Initialize peer state with old connection time + cab.peerCache.Add(testPeer, peerState{ + lastConnTime: time.Now().Add(-2 * PeerProbeThreshold), + }) + + // Run probe with mockHost instead of h + cab.probePeers(ctx, mockHost) + + // Verify connect failures increased + pState, exists := cab.peerCache.Get(testPeer) + assert.True(t, exists) + assert.Equal(t, pState.connectFailures, uint(1)) +} + +func TestShouldProbePeer(t *testing.T) { + t.Parallel() + + cab, err := newCachedAddrBook() + require.NoError(t, err) + + testPeer := peer.ID("test-peer") + + tests := []struct { + name string + peerState peerState + expectedResult bool + }{ + { + name: "peer not in cache", + peerState: peerState{}, + expectedResult: true, + }, + { + name: "no failures, within threshold", + peerState: peerState{ + lastFailedConnTime: time.Now().Add(-30 * time.Minute), + connectFailures: 0, + }, + expectedResult: false, + }, + { + name: "no failures, beyond threshold", + peerState: peerState{ + lastFailedConnTime: time.Now().Add(-2 * PeerProbeThreshold), + connectFailures: 0, + }, + expectedResult: true, + }, + { + name: "one failure, within backoff", + peerState: peerState{ + lastFailedConnTime: time.Now().Add(-90 * time.Minute), + connectFailures: 1, + }, + expectedResult: true, + }, + { + name: "one failure, beyond backoff", + peerState: peerState{ + lastFailedConnTime: time.Now().Add(-3 * PeerProbeThreshold), + connectFailures: 1, + }, + expectedResult: true, + }, + { + name: "two failures, within backoff", + peerState: peerState{ + lastFailedConnTime: time.Now().Add(-90 * time.Minute), + connectFailures: 2, + }, + expectedResult: false, + }, + { + name: "two failures, beyond backoff", + peerState: peerState{ + lastFailedConnTime: time.Now().Add(-3 * PeerProbeThreshold), + connectFailures: 2, + }, + expectedResult: true, + }, + { + name: "never failed connection", + peerState: peerState{ + lastFailedConnTime: time.Time{}, // zero time + connectFailures: 0, + }, + expectedResult: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.peerState != (peerState{}) { + cab.peerCache.Add(testPeer, tt.peerState) + } + result := cab.ShouldProbePeer(testPeer) + assert.Equal(t, tt.expectedResult, result, + "expected ShouldProbePeer to return %v for case: %s", + tt.expectedResult, tt.name) + }) + } +} + +// Mock connection for testing +type mockConnection struct { + network.Conn + remoteAddr ma.Multiaddr +} + +func (mc *mockConnection) RemoteMultiaddr() ma.Multiaddr { + return mc.remoteAddr +} + +type mockHost struct { + host.Host + eventBus event.Bus +} + +func (mh *mockHost) Connect(ctx context.Context, pi peer.AddrInfo) error { + // Simulate connection failure + return fmt.Errorf("mock connection failure") +} + +// Add Network method to mockHost +func (mh *mockHost) Network() network.Network { + return &mockNetwork{} +} + +// Add mockNetwork implementation +type mockNetwork struct { + network.Network +} + +func (mn *mockNetwork) Connectedness(p peer.ID) network.Connectedness { + // Simulate not connected state + return network.NotConnected +} + +func (mh *mockHost) EventBus() event.Bus { + return mh.eventBus +} diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 2c10c44..f09ac7d 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -37,6 +37,24 @@ Whether or not the Accelerated DHT is enabled or not. Default: `true` +### `SOMEGUY_CACHED_ADDR_BOOK` + +Whether or not the Cached Address Book is enabled or not. If disabled, someguy will not return cached addresses for peers without multiaddrs in `FindProviders`. + +Default: `true` + +### `SOMEGUY_CACHED_ADDR_BOOK_RECENT_TTL` + +The TTL for recently connected peers' multiaddrs in the cached address book. Only applies if `SOMEGUY_CACHED_ADDR_BOOK` is enabled. + +Default: `48h` + +### `SOMEGUY_CACHED_ADDR_BOOK_ACTIVE_PROBING` + +Whether or not the Cached Address Book should actively probe peers in cache to keep their multiaddrs up to date. Only applies if `SOMEGUY_CACHED_ADDR_BOOK` is enabled. + +Default: `true` + ### `SOMEGUY_PROVIDER_ENDPOINTS` Comma-separated list of other Delegated Routing V1 endpoints to proxy provider requests to. diff --git a/go.mod b/go.mod index 71c327e..8180fea 100644 --- a/go.mod +++ b/go.mod @@ -7,10 +7,11 @@ require ( github.com/coreos/go-systemd/v22 v22.5.0 github.com/dustin/go-humanize v1.0.1 github.com/felixge/httpsnoop v1.0.4 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/ipfs/boxo v0.24.4-0.20241119003055-e38f236348d6 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-log/v2 v2.5.1 - github.com/libp2p/go-libp2p v0.37.0 + github.com/libp2p/go-libp2p v0.37.2 github.com/libp2p/go-libp2p-kad-dht v0.28.1 github.com/libp2p/go-libp2p-record v0.2.0 github.com/multiformats/go-multiaddr v0.13.0 @@ -90,10 +91,10 @@ require ( github.com/mr-tron/base58 v1.2.0 // indirect github.com/multiformats/go-base32 v0.1.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect - github.com/multiformats/go-multiaddr-dns v0.4.0 // indirect + github.com/multiformats/go-multiaddr-dns v0.4.1 // indirect github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect github.com/multiformats/go-multicodec v0.9.0 // indirect - github.com/multiformats/go-multistream v0.5.0 // indirect + github.com/multiformats/go-multistream v0.6.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/onsi/ginkgo/v2 v2.20.2 // indirect @@ -123,7 +124,7 @@ require ( github.com/prometheus/common v0.60.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/quic-go/qpack v0.5.1 // indirect - github.com/quic-go/quic-go v0.48.1 // indirect + github.com/quic-go/quic-go v0.48.2 // indirect github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect diff --git a/go.sum b/go.sum index 0d4312f..627eddd 100644 --- a/go.sum +++ b/go.sum @@ -186,6 +186,8 @@ github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9 github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= @@ -273,8 +275,8 @@ github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZ github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= github.com/libp2p/go-flow-metrics v0.2.0 h1:EIZzjmeOE6c8Dav0sNv35vhZxATIXWZg6j/C08XmmDw= github.com/libp2p/go-flow-metrics v0.2.0/go.mod h1:st3qqfu8+pMfh+9Mzqb2GTiwrAGjIPszEjZmtksN8Jc= -github.com/libp2p/go-libp2p v0.37.0 h1:8K3mcZgwTldydMCNOiNi/ZJrOB9BY+GlI3UxYzxBi9A= -github.com/libp2p/go-libp2p v0.37.0/go.mod h1:GOKmSN99scDuYGTwaTbQPR8Nt6dxrK3ue7OjW2NGDg4= +github.com/libp2p/go-libp2p v0.37.2 h1:Irh+n9aDPTLt9wJYwtlHu6AhMUipbC1cGoJtOiBqI9c= +github.com/libp2p/go-libp2p v0.37.2/go.mod h1:M8CRRywYkqC6xKHdZ45hmqVckBj5z4mRLIMLWReypz8= github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94= github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8= github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g= @@ -355,8 +357,8 @@ github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4= github.com/multiformats/go-multiaddr v0.13.0 h1:BCBzs61E3AGHcYYTv8dqRH43ZfyrqM8RXVPT8t13tLQ= github.com/multiformats/go-multiaddr v0.13.0/go.mod h1:sBXrNzucqkFJhvKOiwwLyqamGa/P5EIXNPLovyhQCII= -github.com/multiformats/go-multiaddr-dns v0.4.0 h1:P76EJ3qzBXpUXZ3twdCDx/kvagMsNo0LMFXpyms/zgU= -github.com/multiformats/go-multiaddr-dns v0.4.0/go.mod h1:7hfthtB4E4pQwirrz+J0CcDUfbWzTqEzVyYKKIKpgkc= +github.com/multiformats/go-multiaddr-dns v0.4.1 h1:whi/uCLbDS3mSEUMb1MsoT4uzUeZB0N32yzufqS0i5M= +github.com/multiformats/go-multiaddr-dns v0.4.1/go.mod h1:7hfthtB4E4pQwirrz+J0CcDUfbWzTqEzVyYKKIKpgkc= github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E= github.com/multiformats/go-multiaddr-fmt v0.1.0/go.mod h1:hGtDIW4PU4BqJ50gW2quDuPVjyWNZxToGUh/HwTZYJo= github.com/multiformats/go-multiaddr-net v0.1.1/go.mod h1:5JNbcfBOP4dnhoZOv10JJVkJO0pCCEf8mTnipAo2UZQ= @@ -371,8 +373,8 @@ github.com/multiformats/go-multihash v0.0.10/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpK github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= -github.com/multiformats/go-multistream v0.5.0 h1:5htLSLl7lvJk3xx3qT/8Zm9J4K8vEOf/QGkvOGQAyiE= -github.com/multiformats/go-multistream v0.5.0/go.mod h1:n6tMZiwiP2wUsR8DgfDWw1dydlEqV3l6N3/GBsX6ILA= +github.com/multiformats/go-multistream v0.6.0 h1:ZaHKbsL404720283o4c/IHQXiS6gb8qAN5EIJ4PN5EA= +github.com/multiformats/go-multistream v0.6.0/go.mod h1:MOyoG5otO24cHIg8kf9QW2/NozURlkP/rvi2FQJyCPg= github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= @@ -467,8 +469,8 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI= github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg= -github.com/quic-go/quic-go v0.48.1 h1:y/8xmfWI9qmGTc+lBr4jKRUWLGSlSigv847ULJ4hYXA= -github.com/quic-go/quic-go v0.48.1/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs= +github.com/quic-go/quic-go v0.48.2 h1:wsKXZPeGWpMpCGSWqOcqpW2wZYic/8T3aqiOID0/KWE= +github.com/quic-go/quic-go v0.48.2/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs= github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 h1:4WFk6u3sOT6pLa1kQ50ZVdm8BQFgJNA117cepZxtLIg= github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66/go.mod h1:Vp72IJajgeOL6ddqrAhmp7IM9zbTcgkQxD/YdxrVwMw= github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk= diff --git a/main.go b/main.go index 8b0fc55..94488bb 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,25 @@ func main() { EnvVars: []string{"SOMEGUY_ACCELERATED_DHT"}, Usage: "run the accelerated DHT client", }, + &cli.BoolFlag{ + Name: "cached-addr-book", + Value: true, + EnvVars: []string{"SOMEGUY_CACHED_ADDR_BOOK"}, + Usage: "use a cached address book to improve provider lookup responses", + }, + &cli.BoolFlag{ + Name: "cached-addr-book-active-probing", + Value: true, + EnvVars: []string{"SOMEGUY_CACHED_ADDR_BOOK_ACTIVE_PROBING"}, + Usage: "actively probe peers in cache to keep their multiaddrs up to date", + }, + &cli.DurationFlag{ + Name: "cached-addr-book-recent-ttl", + DefaultText: DefaultRecentlyConnectedAddrTTL.String(), + Value: DefaultRecentlyConnectedAddrTTL, + EnvVars: []string{"SOMEGUY_CACHED_ADDR_BOOK_RECENT_TTL"}, + Usage: "TTL for recently connected peers' multiaddrs in the cached address book", + }, &cli.StringSliceFlag{ Name: "provider-endpoints", Value: cli.NewStringSlice(cidContactEndpoint), @@ -115,8 +134,11 @@ func main() { }, Action: func(ctx *cli.Context) error { cfg := &config{ - listenAddress: ctx.String("listen-address"), - acceleratedDHTClient: ctx.Bool("accelerated-dht"), + listenAddress: ctx.String("listen-address"), + acceleratedDHTClient: ctx.Bool("accelerated-dht"), + cachedAddrBook: ctx.Bool("cached-addr-book"), + cachedAddrBookActiveProbing: ctx.Bool("cached-addr-book-active-probing"), + cachedAddrBookRecentTTL: ctx.Duration("cached-addr-book-recent-ttl"), contentEndpoints: ctx.StringSlice("provider-endpoints"), peerEndpoints: ctx.StringSlice("peer-endpoints"), diff --git a/server.go b/server.go index 2a50ec8..4052c81 100644 --- a/server.go +++ b/server.go @@ -40,8 +40,11 @@ func withRequestLogger(next http.Handler) http.Handler { } type config struct { - listenAddress string - acceleratedDHTClient bool + listenAddress string + acceleratedDHTClient bool + cachedAddrBook bool + cachedAddrBookActiveProbing bool + cachedAddrBookRecentTTL time.Duration contentEndpoints []string peerEndpoints []string @@ -80,17 +83,36 @@ func start(ctx context.Context, cfg *config) error { dhtRouting = standardDHT } - crRouters, err := getCombinedRouting(cfg.contentEndpoints, dhtRouting) + var cachedAddrBook *cachedAddrBook + + if cfg.cachedAddrBook { + fmt.Printf("Using cached address book to speed up provider discovery (active probing enabled: %t)\n", cfg.cachedAddrBookActiveProbing) + opts := []AddrBookOption{} + + if cfg.cachedAddrBookRecentTTL > 0 { + opts = append(opts, WithRecentlyConnectedTTL(cfg.cachedAddrBookRecentTTL)) + } + + opts = append(opts, WithActiveProbing(cfg.cachedAddrBookActiveProbing)) + + cachedAddrBook, err = newCachedAddrBook(opts...) + if err != nil { + return err + } + go cachedAddrBook.background(ctx, h) + } + + crRouters, err := getCombinedRouting(cfg.contentEndpoints, dhtRouting, cachedAddrBook) if err != nil { return err } - prRouters, err := getCombinedRouting(cfg.peerEndpoints, dhtRouting) + prRouters, err := getCombinedRouting(cfg.peerEndpoints, dhtRouting, cachedAddrBook) if err != nil { return err } - ipnsRouters, err := getCombinedRouting(cfg.ipnsEndpoints, dhtRouting) + ipnsRouters, err := getCombinedRouting(cfg.ipnsEndpoints, dhtRouting, cachedAddrBook) if err != nil { return err } @@ -109,11 +131,15 @@ func start(ctx context.Context, cfg *config) error { _ = tp.Shutdown(ctx) }() + handlerOpts := []server.Option{ + server.WithPrometheusRegistry(prometheus.DefaultRegisterer), + } + handler := server.Handler(&composableRouter{ providers: crRouters, peers: prRouters, ipns: ipnsRouters, - }, server.WithPrometheusRegistry(prometheus.DefaultRegisterer)) + }, handlerOpts...) // Add CORS. handler = cors.New(cors.Options{ @@ -216,12 +242,20 @@ func newHost(cfg *config) (host.Host, error) { return h, nil } -func getCombinedRouting(endpoints []string, dht routing.Routing) (router, error) { +func getCombinedRouting(endpoints []string, dht routing.Routing, cachedAddrBook *cachedAddrBook) (router, error) { + var dhtRouter router + + if cachedAddrBook != nil { + dhtRouter = NewCachedRouter(sanitizeRouter{libp2pRouter{routing: dht}}, cachedAddrBook) + } else { + dhtRouter = sanitizeRouter{libp2pRouter{routing: dht}} + } + if len(endpoints) == 0 { - return sanitizeRouter{libp2pRouter{routing: dht}}, nil + return dhtRouter, nil } - var routers []router + var delegatedRouters []router for _, endpoint := range endpoints { drclient, err := drclient.New(endpoint, @@ -233,12 +267,12 @@ func getCombinedRouting(endpoints []string, dht routing.Routing) (router, error) if err != nil { return nil, err } - routers = append(routers, clientRouter{Client: drclient}) + delegatedRouters = append(delegatedRouters, clientRouter{Client: drclient}) } - return sanitizeRouter{parallelRouter{ - routers: append(routers, libp2pRouter{routing: dht}), - }}, nil + return parallelRouter{ + routers: append(delegatedRouters, dhtRouter), + }, nil } func withTracingAndDebug(next http.Handler, authToken string) http.Handler { diff --git a/server_cached_router.go b/server_cached_router.go new file mode 100644 index 0000000..c66c6aa --- /dev/null +++ b/server_cached_router.go @@ -0,0 +1,230 @@ +package main + +import ( + "context" + "errors" + "sync/atomic" + "time" + + "github.com/ipfs/boxo/routing/http/types" + "github.com/ipfs/boxo/routing/http/types/iter" + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + _ router = cachedRouter{} + + // peerAddrLookups allows us reason if/how effective peer addr cache is + peerAddrLookups = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "peer_addr_lookups", + Subsystem: "cached_router", + Namespace: name, + Help: "Number of peer addr info lookups per origin and cache state", + }, + []string{addrCacheStateLabel, addrQueryOriginLabel}, + ) + + errNoValueAvailable = errors.New("no value available") +) + +const ( + // cache=unused|hit|miss, indicates how effective cache is + addrCacheStateLabel = "cache" + addrCacheStateUnused = "unused" + addrCacheStateHit = "hit" + addrCacheStateMiss = "miss" + + // source=providers|peers indicates if query originated from provider or peer endpoint + addrQueryOriginLabel = "origin" + addrQueryOriginProviders = "providers" + addrQueryOriginPeers = "peers" + addrQueryOriginUnknown = "unknown" + + DispatchedFindPeersTimeout = time.Minute +) + +// cachedRouter wraps a router with the cachedAddrBook to retrieve cached addresses for peers without multiaddrs in FindProviders +type cachedRouter struct { + router + cachedAddrBook *cachedAddrBook +} + +func NewCachedRouter(router router, cab *cachedAddrBook) cachedRouter { + return cachedRouter{router, cab} +} + +func (r cachedRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { + it, err := r.router.FindProviders(ctx, key, limit) + if err != nil { + return nil, err + } + + iter := NewCacheFallbackIter(it, r, ctx) + return iter, nil +} + +// FindPeers uses a simpler approach than FindProviders because we're dealing with a single PeerRecord, and there's +// no point in trying to dispatch an additional FindPeer call. +func (r cachedRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { + it, err := r.router.FindPeers(ctx, pid, limit) + + if err == routing.ErrNotFound { + // ErrNotFound will be returned if either dialing the peer failed or the peer was not found + r.cachedAddrBook.RecordFailedConnection(pid) // record the failure used for probing/backoff purposes + return nil, routing.ErrNotFound + } + + if err != nil { + return nil, err + } + + // update the metrics to indicate that we didn't look up the cache for this lookup + peerAddrLookups.WithLabelValues(addrCacheStateUnused, addrQueryOriginPeers).Inc() + return it, nil +} + +// withAddrsFromCache returns the best list of addrs for specified [peer.ID]. +// It will consult cache ONLY if the addrs slice passed to it is empty. +func (r cachedRouter) withAddrsFromCache(queryOrigin string, pid peer.ID, addrs []types.Multiaddr) []types.Multiaddr { + // skip cache if we already have addrs + if len(addrs) > 0 { + peerAddrLookups.WithLabelValues(addrCacheStateUnused, queryOrigin).Inc() + return addrs + } + + cachedAddrs := r.cachedAddrBook.GetCachedAddrs(pid) // Get cached addresses + + if len(cachedAddrs) > 0 { + logger.Debugw("found cached addresses", "peer", pid, "cachedAddrs", cachedAddrs) + peerAddrLookups.WithLabelValues(addrCacheStateHit, queryOrigin).Inc() // Cache hit + return cachedAddrs + } else { + peerAddrLookups.WithLabelValues(addrCacheStateMiss, queryOrigin).Inc() // Cache miss + return nil + } +} + +var _ iter.ResultIter[types.Record] = &cacheFallbackIter{} + +type cacheFallbackIter struct { + sourceIter iter.ResultIter[types.Record] + current iter.Result[types.Record] + findPeersResult chan types.PeerRecord + router cachedRouter + ctx context.Context + ongoingLookups atomic.Int32 +} + +// NewCacheFallbackIter is a wrapper around a results iterator that will resolve peers with no addresses from cache and if no cached addresses, will look them up via FindPeers. +// It's a bit complex because it ensures we continue iterating without blocking on the FindPeers call. +func NewCacheFallbackIter(sourceIter iter.ResultIter[types.Record], router cachedRouter, ctx context.Context) *cacheFallbackIter { + iter := &cacheFallbackIter{ + sourceIter: sourceIter, + router: router, + ctx: ctx, + findPeersResult: make(chan types.PeerRecord), + ongoingLookups: atomic.Int32{}, + } + + return iter +} + +func (it *cacheFallbackIter) Next() bool { + // Try to get the next value from the source iterator first + if it.sourceIter.Next() { + val := it.sourceIter.Val() + handleRecord := func(id *peer.ID, record *types.PeerRecord) bool { + record.Addrs = it.router.withAddrsFromCache(addrQueryOriginProviders, *id, record.Addrs) + if len(record.Addrs) > 0 { + it.current = iter.Result[types.Record]{Val: record} + return true + } + logger.Infow("no cached addresses found in cacheFallbackIter, dispatching find peers", "peer", id) + + if it.router.cachedAddrBook.ShouldProbePeer(*id) { + it.ongoingLookups.Add(1) // important to increment before dispatchFindPeer + // If a record has no addrs, we dispatch a lookup to find addresses + go it.dispatchFindPeer(*record) + } + return it.Next() // Recursively call Next() to either read from sourceIter or wait for lookup result + } + + switch val.Val.GetSchema() { + case types.SchemaPeer: + if record, ok := val.Val.(*types.PeerRecord); ok { + return handleRecord(record.ID, record) + } + } + it.current = val // pass through unknown schemas + return true + } + + // If there are still ongoing lookups, wait for them + if it.ongoingLookups.Load() > 0 { + logger.Infow("waiting for ongoing find peers result") + select { + case result, ok := <-it.findPeersResult: + if !ok { + return false // channel closed. We're done + } + if len(result.Addrs) > 0 { // Only if the lookup returned a result and it has addrs + it.current = iter.Result[types.Record]{Val: &result} + return true + } else { + return it.Next() // recursively call Next() in case there are more ongoing lookups + } + case <-it.ctx.Done(): + return false + } + } + + return false +} + +func (it *cacheFallbackIter) Val() iter.Result[types.Record] { + if it.current.Val != nil || it.current.Err != nil { + return it.current + } + return iter.Result[types.Record]{Err: errNoValueAvailable} +} + +func (it *cacheFallbackIter) Close() error { + return it.sourceIter.Close() +} + +func (it *cacheFallbackIter) dispatchFindPeer(record types.PeerRecord) { + defer it.ongoingLookups.Add(-1) + + // Create a new context with a timeout that is independent of the main request context + // This is important because finishing (and determining whether this peer is reachable) the + // FindPeer will benefit other requests and keep the cache up to date. + ctx, cancel := context.WithTimeout(context.Background(), DispatchedFindPeersTimeout) + defer cancel() + + peersIt, err := it.router.FindPeers(ctx, *record.ID, 1) + + // Check if the parent context is done before sending + if it.ctx.Err() != nil { + return // Exit early if the parent context is done + } + + if err != nil { + it.findPeersResult <- record // pass back the record with no addrs + return + } + peers, err := iter.ReadAllResults(peersIt) + if err != nil { + it.findPeersResult <- record // pass back the record with no addrs + return + } + if len(peers) > 0 { + // If we found the peer, pass back the result + it.findPeersResult <- *peers[0] + } else { + it.findPeersResult <- record // pass back the record with no addrs + } +} diff --git a/server_cached_router_test.go b/server_cached_router_test.go new file mode 100644 index 0000000..e0ec1c6 --- /dev/null +++ b/server_cached_router_test.go @@ -0,0 +1,418 @@ +package main + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/boxo/routing/http/types" + "github.com/ipfs/boxo/routing/http/types/iter" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type mockResultIter[T any] struct { + results []iter.Result[T] + current int + closed bool +} + +// Simple mock results iter that doesn't use channels +func newMockResultIter[T any](results []iter.Result[T]) *mockResultIter[T] { + return &mockResultIter[T]{ + results: results, + current: -1, + closed: false, + } +} + +func (m *mockResultIter[T]) Next() bool { + if m.closed { + return false + } + m.current++ + return m.current < len(m.results) +} + +func (m *mockResultIter[T]) Val() iter.Result[T] { + if m.current < 0 || m.current >= len(m.results) { + panic("Val() called without calling Next() or after Next() returned false") + } + return m.results[m.current] +} + +func (m *mockResultIter[T]) Close() error { + m.closed = true + return nil +} + +func TestCachedRouter(t *testing.T) { + t.Parallel() + + t.Run("FindProviders with cached addresses", func(t *testing.T) { + ctx := context.Background() + c := makeCID() + pid := peer.ID("test-peer") + + // Create mock router + mr := &mockRouter{} + mockIter := newMockResultIter([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: nil}}, + }) + mr.On("FindProviders", mock.Anything, c, 10).Return(mockIter, nil) + + // Create cached address book with test addresses + cab, err := newCachedAddrBook() + require.NoError(t, err) + + publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001") + cab.addrBook.AddAddrs(pid, []multiaddr.Multiaddr{publicAddr.Multiaddr}, time.Hour) + + // Create cached router + cr := NewCachedRouter(mr, cab) + + it, err := cr.FindProviders(ctx, c, 10) + require.NoError(t, err) + + results, err := iter.ReadAllResults(it) + require.NoError(t, err) + require.Len(t, results, 1) + + // Verify cached addresses were added + peerRecord := results[0].(*types.PeerRecord) + require.Equal(t, pid, *peerRecord.ID) + require.Len(t, peerRecord.Addrs, 1) + require.Equal(t, publicAddr.String(), peerRecord.Addrs[0].String()) + }) + + t.Run("Failed FindPeers with cached addresses does not return cached addresses", func(t *testing.T) { + ctx := context.Background() + pid := peer.ID("test-peer") + + // Create mock router that returns error + mr := &mockRouter{} + mr.On("FindPeers", mock.Anything, pid, 10).Return(nil, routing.ErrNotFound) + + // Create cached address book with test addresses + cab, err := newCachedAddrBook() + require.NoError(t, err) + + publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001") + cab.addrBook.AddAddrs(pid, []multiaddr.Multiaddr{publicAddr.Multiaddr}, time.Hour) + + // Create cached router + cr := NewCachedRouter(mr, cab) + + _, err = cr.FindPeers(ctx, pid, 10) + require.ErrorIs(t, err, routing.ErrNotFound) + }) + + t.Run("FindPeers with cache miss", func(t *testing.T) { + ctx := context.Background() + pid := peer.ID("test-peer") + + // Create mock router + mr := &mockRouter{} + mockIter := newMockIter[*types.PeerRecord](ctx) + mr.On("FindPeers", mock.Anything, pid, 10).Return(mockIter, nil) + + // Create empty cached address book + cab, err := newCachedAddrBook() + require.NoError(t, err) + + // Create cached router + cr := NewCachedRouter(mr, cab) + + publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001") + + // Simulate peer response with addresses + go func() { + mockIter.ch <- iter.Result[*types.PeerRecord]{Val: &types.PeerRecord{ + Schema: "peer", + ID: &pid, + Addrs: []types.Multiaddr{publicAddr}, + }} + close(mockIter.ch) + }() + + it, err := cr.FindPeers(ctx, pid, 10) + require.NoError(t, err) + + results, err := iter.ReadAllResults(it) + require.NoError(t, err) + require.Len(t, results, 1) + + // Verify addresses from response were returned + require.Equal(t, pid, *results[0].ID) + require.Len(t, results[0].Addrs, 1) + require.Equal(t, publicAddr.String(), results[0].Addrs[0].String()) + }) + +} + +func TestCacheFallbackIter(t *testing.T) { + t.Parallel() + + t.Run("handles source iterator with no fallback needed", func(t *testing.T) { + ctx := context.Background() + pid := peer.ID("test-peer") + publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001") + + // Create source iterator with addresses + sourceIter := newMockResultIter([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: []types.Multiaddr{publicAddr}}}, + }) + + // Create cached router + mr := &mockRouter{} + cab, err := newCachedAddrBook() + require.NoError(t, err) + cr := NewCachedRouter(mr, cab) + + // Create fallback iterator + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + + // Read all results + results, err := iter.ReadAllResults(fallbackIter) + require.NoError(t, err) + require.Len(t, results, 1) + + peerRecord := results[0].(*types.PeerRecord) + require.Equal(t, pid, *peerRecord.ID) + require.Len(t, peerRecord.Addrs, 1) + require.Equal(t, publicAddr.String(), peerRecord.Addrs[0].String()) + }) + + t.Run("uses cache when source has no addresses", func(t *testing.T) { + ctx := context.Background() + pid := peer.ID("test-peer") + publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001") + + // Create source iterator without addresses + sourceIter := newMockResultIter([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: nil}}, + }) + + // Create cached router with cached addresses + mr := &mockRouter{} + cab, err := newCachedAddrBook() + require.NoError(t, err) + cab.addrBook.AddAddrs(pid, []multiaddr.Multiaddr{publicAddr.Multiaddr}, time.Hour) + cr := NewCachedRouter(mr, cab) + + // Create fallback iterator + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + + // Read all results + results, err := iter.ReadAllResults(fallbackIter) + require.NoError(t, err) + require.Len(t, results, 1) + + peerRecord := results[0].(*types.PeerRecord) + require.Equal(t, pid, *peerRecord.ID) + require.Len(t, peerRecord.Addrs, 1) + require.Equal(t, publicAddr.String(), peerRecord.Addrs[0].String()) + }) + + t.Run("falls back to FindPeers when cache misses", func(t *testing.T) { + ctx := context.Background() + pid := peer.ID("test-peer") + publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001") + + // Create source iterator without addresses + sourceIter := newMockResultIter([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: nil}}, + }) + + // Create mock router that returns addresses via FindPeers + mr := &mockRouter{} + findPeersIter := newMockResultIter([]iter.Result[*types.PeerRecord]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: []types.Multiaddr{publicAddr}}}, + }) + mr.On("FindPeers", mock.Anything, pid, 1).Return(findPeersIter, nil) + + // Create cached router with empty cache + cab, err := newCachedAddrBook() + require.NoError(t, err) + cr := NewCachedRouter(mr, cab) + + // Create fallback iterator + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + + // Read all results + results, err := iter.ReadAllResults(fallbackIter) + require.NoError(t, err) + require.Len(t, results, 1) + + peerRecord := results[0].(*types.PeerRecord) + require.Equal(t, pid, *peerRecord.ID) + require.Len(t, peerRecord.Addrs, 1) + require.Equal(t, publicAddr.String(), peerRecord.Addrs[0].String()) + }) + + t.Run("handles context cancellation", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + // Create source iterator that will block + sourceIter := newMockIter[types.Record](ctx) + + // Create cached router + mr := &mockRouter{} + cab, err := newCachedAddrBook() + require.NoError(t, err) + cr := NewCachedRouter(mr, cab) + + // Create fallback iterator + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + + // Cancel context before sending any values + cancel() + + // Verify iterator stops + require.False(t, fallbackIter.Next()) + require.NoError(t, fallbackIter.Close()) + }) + + t.Run("handles multiple Val() calls correctly", func(t *testing.T) { + ctx := context.Background() + pid := peer.ID("test-peer") + publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001") + + // Create source iterator with a single record + sourceIter := newMockResultIter([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: []types.Multiaddr{publicAddr}}}, + }) + + // Create cached router + mr := &mockRouter{} + cab, err := newCachedAddrBook() + require.NoError(t, err) + cr := NewCachedRouter(mr, cab) + + // Create fallback iterator + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + + // First Next() should succeed + require.True(t, fallbackIter.Next()) + + // Multiple Val() calls should return the same value + val1 := fallbackIter.Val() + val2 := fallbackIter.Val() + require.Equal(t, val1, val2) + + // Value should be correct + peerRecord := val1.Val.(*types.PeerRecord) + require.Equal(t, pid, *peerRecord.ID) + require.Equal(t, publicAddr.String(), peerRecord.Addrs[0].String()) + + // After consuming the only value, Next() should return false + require.False(t, fallbackIter.Next()) + }) + + t.Run("handles context cancellation during lookup", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + pid := peer.ID("test-peer") + publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001") + + // Create source iterator with record without addresses + sourceIter := newMockResultIter([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: nil}}, + }) + + // Create mock router with FindPeers that returns + mr := &mockRouter{} + // mr.On("FindPeers", mock.Anything, pid, 1).Return(nil, routing.ErrNotFound) + findPeersIter := newMockResultIter([]iter.Result[*types.PeerRecord]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: []types.Multiaddr{publicAddr}}}, + }) + mr.On("FindPeers", mock.Anything, pid, 1).Return(findPeersIter, nil) + + // Create cached router + cab, err := newCachedAddrBook() + require.NoError(t, err) + cr := NewCachedRouter(mr, cab) + + // Create fallback iterator + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + + // Cancel context during lookup + cancel() + + // First Next() should trigger lookup + require.False(t, fallbackIter.Next()) + }) + + t.Run("Fallback FindPeers with no addresses is omitted from result", func(t *testing.T) { + ctx := context.Background() + pid := peer.ID("test-peer") + + // Create source iterator without addresses + sourceIter := newMockResultIter([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: nil}}, + }) + + // Create mock router that returns error from FindPeers + mr := &mockRouter{} + mr.On("FindPeers", mock.Anything, pid, 1).Return(nil, routing.ErrNotFound) + + // Create cached router with empty cache + cab, err := newCachedAddrBook() + require.NoError(t, err) + cr := NewCachedRouter(mr, cab) + + // Create fallback iterator + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + + // Should still get a result, but with no addresses + results, err := iter.ReadAllResults(fallbackIter) + require.NoError(t, err) + require.Len(t, results, 0) + }) + + t.Run("handles multiple records with mixed address states", func(t *testing.T) { + ctx := context.Background() + pid1 := peer.ID("test-peer-1") + pid2 := peer.ID("test-peer-2") + pid3 := peer.ID("test-peer-3") + publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001") + + // Create source iterator with multiple records + sourceIter := newMockResultIter([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid1, Addrs: []types.Multiaddr{publicAddr}}}, // Has address + {Val: &types.PeerRecord{Schema: "peer", ID: &pid2, Addrs: nil}}, // No address, will use cache + {Val: &types.PeerRecord{Schema: "peer", ID: &pid3, Addrs: nil}}, // No address, will need FindPeers + }) + + // Create mock router + mr := &mockRouter{} + findPeersIter := newMockResultIter([]iter.Result[*types.PeerRecord]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid3, Addrs: []types.Multiaddr{publicAddr}}}, + }) + mr.On("FindPeers", mock.Anything, pid3, 1).Return(findPeersIter, nil) + + // Create cached router with some cached addresses + cab, err := newCachedAddrBook() + require.NoError(t, err) + cab.addrBook.AddAddrs(pid2, []multiaddr.Multiaddr{publicAddr.Multiaddr}, time.Hour) + cr := NewCachedRouter(mr, cab) + + // Create fallback iterator + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + + // Should get all records with addresses + results, err := iter.ReadAllResults(fallbackIter) + require.NoError(t, err) + require.Len(t, results, 3) + + // Verify each record has the expected addresses + for _, result := range results { + record := result.(*types.PeerRecord) + require.Len(t, record.Addrs, 1) + require.Equal(t, publicAddr.String(), record.Addrs[0].String()) + } + }) + +} diff --git a/server_test.go b/server_test.go index 99c10b2..42ee31f 100644 --- a/server_test.go +++ b/server_test.go @@ -10,15 +10,15 @@ func TestGetCombinedRouting(t *testing.T) { t.Parallel() // Check of the result of get combined routing is a sanitize router. - v, err := getCombinedRouting(nil, &bundledDHT{}) + v, err := getCombinedRouting(nil, &bundledDHT{}, nil) require.NoError(t, err) require.IsType(t, sanitizeRouter{}, v) - v, err = getCombinedRouting([]string{"https://example.com/"}, nil) + v, err = getCombinedRouting([]string{"https://example.com/"}, nil, nil) require.NoError(t, err) - require.IsType(t, sanitizeRouter{}, v) + require.IsType(t, parallelRouter{}, v) - v, err = getCombinedRouting([]string{"https://example.com/"}, &bundledDHT{}) + v, err = getCombinedRouting([]string{"https://example.com/"}, &bundledDHT{}, nil) require.NoError(t, err) - require.IsType(t, sanitizeRouter{}, v) + require.IsType(t, parallelRouter{}, v) }