Skip to content

Commit

Permalink
P2P: Introduce profiles for hybridRelay, hybridArchival, and hybridCl…
Browse files Browse the repository at this point in the history
…ient. (#6062)

Co-authored-by: chris erway <[email protected]>
Co-authored-by: Pavel Zbitskiy <[email protected]>
  • Loading branch information
3 people authored Jul 18, 2024
1 parent 48a539f commit 1fa0ef7
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 13 deletions.
70 changes: 62 additions & 8 deletions cmd/algocfg/profileCommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ var (
},
}

relay = configUpdater{
description: "Relay consensus messages across the network and support catchup.",
wsRelay = configUpdater{
description: "Relay consensus messages across the ws network and support recent catchup.",
updateFunc: func(cfg config.Local) config.Local {
cfg.MaxBlockHistoryLookback = 22000 // Enough to support 2 catchpoints with some wiggle room for nodes to catch up from the older one
cfg.CatchpointFileHistoryLength = 3
Expand All @@ -80,7 +80,7 @@ var (
}

archival = configUpdater{
description: "Store the full chain history and support catchup.",
description: "Store the full chain history and support full catchup.",
updateFunc: func(cfg config.Local) config.Local {
cfg.Archival = true
cfg.EnableLedgerService = true
Expand All @@ -91,13 +91,67 @@ var (
},
}

hybridRelay = configUpdater{
description: "Relay consensus messages across both ws and p2p networks, also support recent catchup.",
updateFunc: func(cfg config.Local) config.Local {
// WS relay config defaults
cfg.MaxBlockHistoryLookback = 22000 // Enough to support 2 catchpoints with some wiggle room for nodes to catch up from the older one
cfg.CatchpointFileHistoryLength = 3
cfg.CatchpointTracking = 2
cfg.EnableLedgerService = true
cfg.EnableBlockService = true
cfg.NetAddress = ":4160"
// This should be set to the public address of the node if public access is desired
cfg.PublicAddress = config.PlaceholderPublicAddress

// P2P config defaults
cfg.EnableP2PHybridMode = true
cfg.P2PNetAddress = ":4190"
cfg.EnableDHTProviders = true
return cfg
},
}

hybridArchival = configUpdater{
description: "Store the full chain history, support full catchup, P2P enabled, discoverable via DHT.",
updateFunc: func(cfg config.Local) config.Local {
cfg.Archival = true
cfg.EnableLedgerService = true
cfg.EnableBlockService = true
cfg.NetAddress = ":4160"
cfg.EnableGossipService = false
// This should be set to the public address of the node
cfg.PublicAddress = config.PlaceholderPublicAddress

// P2P config defaults
cfg.EnableP2PHybridMode = true
cfg.P2PNetAddress = ":4190"
cfg.EnableDHTProviders = true
return cfg
},
}

hybridClient = configUpdater{
description: "Participate in consensus or simply ensure chain health by validating blocks and supporting P2P traffic propagation.",
updateFunc: func(cfg config.Local) config.Local {

// P2P config defaults
cfg.EnableP2PHybridMode = true
cfg.EnableDHTProviders = true
return cfg
},
}

// profileNames are the supported pre-configurations of config values
profileNames = map[string]configUpdater{
"participation": participation,
"conduit": conduit,
"relay": relay,
"archival": archival,
"development": development,
"participation": participation,
"conduit": conduit,
"wsRelay": wsRelay,
"archival": archival,
"development": development,
"hybridRelay": hybridRelay,
"hybridArchival": hybridArchival,
"hybridClient": hybridClient,
}

forceUpdate bool
Expand Down
62 changes: 62 additions & 0 deletions cmd/algocfg/profileCommand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package main

import (
"github.com/algorand/go-algorand/config"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -62,4 +63,65 @@ func Test_getConfigForArg(t *testing.T) {
require.Equal(t, ":4160", cfg.NetAddress)
require.False(t, cfg.EnableGossipService)
})

t.Run("valid config test hybrid relay", func(t *testing.T) {
t.Parallel()
cfg, err := getConfigForArg("hybridRelay")
require.NoError(t, err)

require.False(t, cfg.Archival)
require.Equal(t, uint64(22000), cfg.MaxBlockHistoryLookback)
require.Equal(t, 3, cfg.CatchpointFileHistoryLength)
require.Equal(t, int64(2), cfg.CatchpointTracking)
require.True(t, cfg.EnableLedgerService)
require.True(t, cfg.EnableBlockService)
require.Equal(t, ":4160", cfg.NetAddress)
require.True(t, cfg.EnableGossipService)
require.Equal(t, config.PlaceholderPublicAddress, cfg.PublicAddress)

require.True(t, cfg.EnableP2PHybridMode)
require.Equal(t, ":4190", cfg.P2PNetAddress)
require.True(t, cfg.EnableDHTProviders)
})

t.Run("valid config test hybrid archival", func(t *testing.T) {
t.Parallel()
cfg, err := getConfigForArg("hybridArchival")
require.NoError(t, err)

require.True(t, cfg.Archival)
require.Equal(t, uint64(0), cfg.MaxBlockHistoryLookback)
require.Equal(t, 365, cfg.CatchpointFileHistoryLength)
require.Equal(t, int64(0), cfg.CatchpointTracking)
require.True(t, cfg.EnableLedgerService)
require.True(t, cfg.EnableBlockService)
require.Equal(t, ":4160", cfg.NetAddress)
require.False(t, cfg.EnableGossipService)
require.Equal(t, config.PlaceholderPublicAddress, cfg.PublicAddress)

require.True(t, cfg.EnableP2PHybridMode)
require.Equal(t, ":4190", cfg.P2PNetAddress)
require.True(t, cfg.EnableDHTProviders)
})

t.Run("valid config test hybrid client", func(t *testing.T) {
t.Parallel()
cfg, err := getConfigForArg("hybridClient")
require.NoError(t, err)

require.False(t, cfg.Archival)
require.Equal(t, uint64(0), cfg.MaxBlockHistoryLookback)
require.Equal(t, 365, cfg.CatchpointFileHistoryLength)
require.Equal(t, int64(0), cfg.CatchpointTracking)
require.False(t, cfg.EnableLedgerService)
require.False(t, cfg.EnableBlockService)
require.Empty(t, cfg.NetAddress)
// True because it is the default value, net address is blank so has no effect in practice
require.True(t, cfg.EnableGossipService)
require.Equal(t, "", cfg.PublicAddress)

require.True(t, cfg.EnableP2PHybridMode)
require.Equal(t, "", cfg.P2PNetAddress)
require.True(t, cfg.EnableDHTProviders)
})
}
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ const CatchpointTrackingModeTracked = 1
// as long as CatchpointInterval > 0
const CatchpointTrackingModeStored = 2

// PlaceholderPublicAddress is a placeholder for the public address generated in certain profiles
const PlaceholderPublicAddress = "PLEASE_SET_ME"

// LoadConfigFromDisk returns a Local config structure based on merging the defaults
// with settings loaded from the config file from the custom dir. If the custom file
// cannot be loaded, the default config is returned (with the error from loading the
Expand Down Expand Up @@ -145,6 +148,11 @@ func mergeConfigFromFile(configpath string, source Local) (Local, error) {

err = loadConfig(f, &source)

// If the PublicAddress in config file has the PlaceholderPublicAddress, treat it as if it were empty
if source.PublicAddress == PlaceholderPublicAddress {
source.PublicAddress = ""
}

if source.NetAddress != "" {
source.EnableLedgerService = true
source.EnableBlockService = true
Expand Down
8 changes: 7 additions & 1 deletion network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func MakeHost(cfg config.Local, datadir string, pstore *pstore.PeerStore) (host.
listenAddr = parsedListenAddr
}
} else {
listenAddr = "/ip4/0.0.0.0/tcp/0"
// don't listen if NetAddress is not set.
listenAddr = ""
}

var disableMetrics = func(cfg *libp2p.Config) error { return nil }
Expand Down Expand Up @@ -163,6 +164,11 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho

// Start starts the P2P service
func (s *serviceImpl) Start() error {
if s.listenAddr == "" {
// don't listen if no listen address configured
return nil
}

listenAddr, err := multiaddr.NewMultiaddr(s.listenAddr)
if err != nil {
s.log.Errorf("failed to create multiaddress: %s", err)
Expand Down
8 changes: 6 additions & 2 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,11 @@ func (n *P2PNetwork) Start() error {
go n.handler.messageHandlerThread(&n.wg, n.wsPeersConnectivityCheckTicker.C, n, "network", "P2PNetwork")
}

n.wg.Add(1)
go n.httpdThread()
// start the HTTP server if configured to listen
if n.config.NetAddress != "" {
n.wg.Add(1)
go n.httpdThread()
}

n.wg.Add(1)
go n.broadcaster.broadcastThread(&n.wg, n, "network", "P2PNetwork")
Expand Down Expand Up @@ -471,6 +474,7 @@ func (n *P2PNetwork) meshThread() {

func (n *P2PNetwork) httpdThread() {
defer n.wg.Done()

err := n.httpServer.Serve()
if err != nil {
n.log.Errorf("Error serving libp2phttp: %v", err)
Expand Down
16 changes: 14 additions & 2 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestP2PSubmitTX(t *testing.T) {

cfg := config.GetDefaultLocal()
cfg.ForceFetchTransactions = true
cfg.NetAddress = "127.0.0.1:0"
log := logging.TestingLog(t)
netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
Expand Down Expand Up @@ -159,6 +160,7 @@ func TestP2PSubmitTXNoGossip(t *testing.T) {

cfg := config.GetDefaultLocal()
cfg.ForceFetchTransactions = true
cfg.NetAddress = "127.0.0.1:0"
log := logging.TestingLog(t)
netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
Expand Down Expand Up @@ -189,6 +191,8 @@ func TestP2PSubmitTXNoGossip(t *testing.T) {

// run netC in NPN mode (no relay => no gossip sup => no TX receiving)
cfg.ForceFetchTransactions = false
// Have to unset NetAddress to get IsGossipServer to return false
cfg.NetAddress = ""
netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
netC.Start()
Expand Down Expand Up @@ -253,6 +257,7 @@ func TestP2PSubmitWS(t *testing.T) {
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.NetAddress = "127.0.0.1:0"
log := logging.TestingLog(t)
netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
Expand Down Expand Up @@ -584,6 +589,7 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) {
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.NetAddress = "127.0.0.1:0"
cfg.EnableDHTProviders = true
log := logging.TestingLog(t)

Expand Down Expand Up @@ -744,6 +750,7 @@ func TestP2PHTTPHandler(t *testing.T) {
cfg := config.GetDefaultLocal()
cfg.EnableDHTProviders = true
cfg.GossipFanout = 1
cfg.NetAddress = "127.0.0.1:0"
log := logging.TestingLog(t)

netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
Expand Down Expand Up @@ -812,6 +819,7 @@ func TestP2PRelay(t *testing.T) {
cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses
cfg.ForceFetchTransactions = true
cfg.BaseLoggerDebugLevel = 5
cfg.NetAddress = "127.0.0.1:0"
log := logging.TestingLog(t)
log.Debugln("Starting netA")
netA, err := NewP2PNetwork(log.With("net", "netA"), cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
Expand All @@ -829,6 +837,8 @@ func TestP2PRelay(t *testing.T) {
multiAddrStr := addrsA[0].String()
phoneBookAddresses := []string{multiAddrStr}

// Explicitly unset NetAddress for netB
cfg.NetAddress = ""
log.Debugf("Starting netB with phonebook addresses %v", phoneBookAddresses)
netB, err := NewP2PNetwork(log.With("net", "netB"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
Expand Down Expand Up @@ -880,8 +890,8 @@ func TestP2PRelay(t *testing.T) {
counterHandler, counterDone := makeCounterHandler(1, &counter, nil)
netA.RegisterProcessors(counterHandler)

// send 5 messages from both netB to netA
// since there is no node with listening address set => no messages should be received
// send 5 messages from netB to netA
// since relaying is disabled on net B => no messages should be received by net A
for i := 0; i < 5; i++ {
err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1031,6 +1041,7 @@ func TestP2PWantTXGossip(t *testing.T) {
net.wantTXGossip.Store(true)
net.nodeInfo = &nopeNodeInfo{}
net.config.ForceFetchTransactions = false
net.config.NetAddress = ""
net.relayMessages = false
net.OnNetworkAdvance()
require.Eventually(t, func() bool { net.wg.Wait(); return true }, 1*time.Second, 50*time.Millisecond)
Expand All @@ -1048,6 +1059,7 @@ func TestP2PWantTXGossip(t *testing.T) {
net.wantTXGossip.Store(false)
net.nodeInfo = &nopeNodeInfo{}
net.config.ForceFetchTransactions = false
net.config.NetAddress = ""
net.relayMessages = true
net.OnNetworkAdvance()
require.Eventually(t, func() bool { return mockService.count.Load() == 3 }, 1*time.Second, 50*time.Millisecond)
Expand Down

0 comments on commit 1fa0ef7

Please sign in to comment.