Skip to content

Commit

Permalink
p2p: fix connection deduplication in hybrid mode (#6082)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Jul 26, 2024
1 parent 2b34eda commit adaecde
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
1 change: 1 addition & 0 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea
networkPeerIdentityDisconnect.Inc(nil)
n.log.With("remote", addr).With("local", localAddr).Warn("peer deduplicated before adding because the identity is already known")
stream.Close()
return
}

wsp.init(n.config, outgoingMessagesBufferSize)
Expand Down
55 changes: 54 additions & 1 deletion network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

"github.com/algorand/go-algorand/config"
algocrypto "github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network/limitcaller"
"github.com/algorand/go-algorand/network/p2p"
Expand Down Expand Up @@ -1081,7 +1082,7 @@ func TestP2PWantTXGossip(t *testing.T) {
require.True(t, net.wantTXGossip.Load())
}

func TestMergeP2PAddrInfoResolvedAddresses(t *testing.T) {
func TestP2PMergeAddrInfoResolvedAddresses(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

Expand Down Expand Up @@ -1154,3 +1155,55 @@ func TestMergeP2PAddrInfoResolvedAddresses(t *testing.T) {
})
}
}

// TestP2PwsStreamHandlerDedup checks that the wsStreamHandler detects duplicate connections
// and does not add a new wePeer for it.
func TestP2PwsStreamHandlerDedup(t *testing.T) {
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses
cfg.NetAddress = "127.0.0.1:0"
log := logging.TestingLog(t)
netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, &identityOpts{tracker: NewIdentityTracker()})
require.NoError(t, err)
err = netA.Start()
require.NoError(t, err)
defer netA.Stop()

peerInfoA := netA.service.AddrInfo()
addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA)
require.NoError(t, err)
require.NotZero(t, addrsA[0])

multiAddrStr := addrsA[0].String()
phoneBookAddresses := []string{multiAddrStr}
netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, &identityOpts{tracker: NewIdentityTracker()})
require.NoError(t, err)

// now say netA's identity tracker knows about netB's peerID
var netIdentPeerID algocrypto.PublicKey
p2pPeerPubKey, err := netB.service.ID().ExtractPublicKey()
require.NoError(t, err)

b, err := p2pPeerPubKey.Raw()
require.NoError(t, err)
netIdentPeerID = algocrypto.PublicKey(b)
wsp := &wsPeer{
identity: netIdentPeerID,
}
netA.identityTracker.setIdentity(wsp)
networkPeerIdentityDisconnectInitial := networkPeerIdentityDisconnect.GetUint64Value()

// start network and ensure dedup happens
err = netB.Start()
require.NoError(t, err)
defer netB.Stop()

require.Eventually(t, func() bool {
return networkPeerIdentityDisconnect.GetUint64Value() == networkPeerIdentityDisconnectInitial+1
}, 2*time.Second, 50*time.Millisecond)

require.False(t, netA.hasPeers())
require.False(t, netB.hasPeers())
}

0 comments on commit adaecde

Please sign in to comment.