Skip to content

Commit

Permalink
p2p: support EnableGossipService in p2p streams (#6073)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Jul 29, 2024
1 parent e697ae8 commit 578684e
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 35 deletions.
2 changes: 1 addition & 1 deletion network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func configureResourceManager(cfg config.Local) (network.ResourceManager, error)
// MakeService creates a P2P service instance
func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, bootstrapPeers []*peer.AddrInfo) (*serviceImpl, error) {

sm := makeStreamManager(ctx, log, h, wsStreamHandler)
sm := makeStreamManager(ctx, log, h, wsStreamHandler, cfg.EnableGossipService)
h.Network().Notify(sm)
h.SetStreamHandler(AlgorandWsProtocol, sm.streamHandler)

Expand Down
58 changes: 24 additions & 34 deletions network/p2p/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (

// streamManager implements network.Notifiee to create and manage streams for use with non-gossipsub protocols.
type streamManager struct {
ctx context.Context
log logging.Logger
host host.Host
handler StreamHandler
ctx context.Context
log logging.Logger
host host.Host
handler StreamHandler
allowIncomingGossip bool

streams map[peer.ID]network.Stream
streamsLock deadlock.Mutex
Expand All @@ -42,18 +43,25 @@ type streamManager struct {
// StreamHandler is called when a new bidirectional stream for a given protocol and peer is opened.
type StreamHandler func(ctx context.Context, pid peer.ID, s network.Stream, incoming bool)

func makeStreamManager(ctx context.Context, log logging.Logger, h host.Host, handler StreamHandler) *streamManager {
func makeStreamManager(ctx context.Context, log logging.Logger, h host.Host, handler StreamHandler, allowIncomingGossip bool) *streamManager {
return &streamManager{
ctx: ctx,
log: log,
host: h,
handler: handler,
streams: make(map[peer.ID]network.Stream),
ctx: ctx,
log: log,
host: h,
handler: handler,
allowIncomingGossip: allowIncomingGossip,
streams: make(map[peer.ID]network.Stream),
}
}

// streamHandler is called by libp2p when a new stream is accepted
func (n *streamManager) streamHandler(stream network.Stream) {
if stream.Conn().Stat().Direction == network.DirInbound && !n.allowIncomingGossip {
n.log.Debugf("rejecting stream from incoming connection from %s", stream.Conn().RemotePeer().String())
stream.Close()
return
}

n.streamsLock.Lock()
defer n.streamsLock.Unlock()

Expand All @@ -74,15 +82,7 @@ func (n *streamManager) streamHandler(stream network.Stream) {
}
n.streams[stream.Conn().RemotePeer()] = stream

// streamHandler is supposed to be called for accepted streams, so we expect incoming here
incoming := stream.Conn().Stat().Direction == network.DirInbound
if !incoming {
if stream.Stat().Direction == network.DirUnknown {
n.log.Warnf("Unknown direction for a steam %s to/from %s", stream.ID(), remotePeer)
} else {
n.log.Warnf("Unexpected outgoing stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())
}
}
n.handler(n.ctx, remotePeer, stream, incoming)
return
}
Expand All @@ -92,20 +92,18 @@ func (n *streamManager) streamHandler(stream network.Stream) {
}
// no old stream
n.streams[stream.Conn().RemotePeer()] = stream
// streamHandler is supposed to be called for accepted streams, so we expect incoming here
incoming := stream.Conn().Stat().Direction == network.DirInbound
if !incoming {
if stream.Stat().Direction == network.DirUnknown {
n.log.Warnf("streamHandler: unknown direction for a steam %s to/from %s", stream.ID(), remotePeer)
} else {
n.log.Warnf("Unexpected outgoing stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())
}
}
n.handler(n.ctx, remotePeer, stream, incoming)
}

// Connected is called when a connection is opened
// for both incoming (listener -> addConn) and outgoing (dialer -> addConn) connections.
func (n *streamManager) Connected(net network.Network, conn network.Conn) {
if conn.Stat().Direction == network.DirInbound && !n.allowIncomingGossip {
n.log.Debugf("ignoring incoming connection from %s", conn.RemotePeer().String())
return
}

remotePeer := conn.RemotePeer()
localPeer := n.host.ID()

Expand Down Expand Up @@ -138,15 +136,7 @@ func (n *streamManager) Connected(net network.Network, conn network.Conn) {
needUnlock = false
n.streamsLock.Unlock()

// a new stream created above, expected direction is outbound
incoming := stream.Conn().Stat().Direction == network.DirInbound
if incoming {
n.log.Warnf("Unexpected incoming stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())
} else {
if stream.Stat().Direction == network.DirUnknown {
n.log.Warnf("Connected: unknown direction for a steam %s to/from %s", stream.ID(), remotePeer)
}
}
n.handler(n.ctx, remotePeer, stream, incoming)
}

Expand Down
138 changes: 138 additions & 0 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1207,3 +1207,141 @@ func TestP2PwsStreamHandlerDedup(t *testing.T) {
require.False(t, netA.hasPeers())
require.False(t, netB.hasPeers())
}

// TestP2PEnableGossipService_NodeDisable ensures that a node with EnableGossipService=false
// still can participate in the network by sending and receiving messages.
func TestP2PEnableGossipService_NodeDisable(t *testing.T) {
partitiontest.PartitionTest(t)

log := logging.TestingLog(t)

// prepare configs
cfg := config.GetDefaultLocal()
cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses

relayCfg := cfg
relayCfg.NetAddress = "127.0.0.1:0"

nodeCfg := cfg
nodeCfg.EnableGossipService = false
nodeCfg2 := nodeCfg
nodeCfg2.NetAddress = "127.0.0.1:0"

tests := []struct {
name string
relayCfg config.Local
nodeCfg config.Local
}{
{"non-listening-node", relayCfg, nodeCfg},
{"listening-node", relayCfg, nodeCfg2},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
relayCfg := test.relayCfg
netA, err := NewP2PNetwork(log, relayCfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)
netA.Start()
defer netA.Stop()

peerInfoA := netA.service.AddrInfo()
addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA)
require.NoError(t, err)
require.NotZero(t, addrsA[0])
multiAddrStr := addrsA[0].String()
phoneBookAddresses := []string{multiAddrStr}

// start netB with gossip service disabled
nodeCfg := test.nodeCfg
netB, err := NewP2PNetwork(log, nodeCfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)
netB.Start()
defer netB.Stop()

require.Eventually(t, func() bool {
return netA.hasPeers() && netB.hasPeers()
}, 1*time.Second, 50*time.Millisecond)

testTag := protocol.AgreementVoteTag

var handlerCountA atomic.Uint32
passThroughHandlerA := []TaggedMessageHandler{
{Tag: testTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage {
handlerCountA.Add(1)
return OutgoingMessage{Action: Broadcast}
})},
}
var handlerCountB atomic.Uint32
passThroughHandlerB := []TaggedMessageHandler{
{Tag: testTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage {
handlerCountB.Add(1)
return OutgoingMessage{Action: Broadcast}
})},
}
netA.RegisterHandlers(passThroughHandlerA)
netB.RegisterHandlers(passThroughHandlerB)

// send messages from both nodes to each other and confirm they are received.
for i := 0; i < 10; i++ {
err = netA.Broadcast(context.Background(), testTag, []byte(fmt.Sprintf("hello from A %d", i)), false, nil)
require.NoError(t, err)
err = netB.Broadcast(context.Background(), testTag, []byte(fmt.Sprintf("hello from B %d", i)), false, nil)
require.NoError(t, err)
}

require.Eventually(
t,
func() bool {
return handlerCountA.Load() == 10 && handlerCountB.Load() == 10
},
2*time.Second,
50*time.Millisecond,
)
})
}
}

// TestP2PEnableGossipService_BothDisable checks if both relay and node have EnableGossipService=false
// they do not gossip to each other.
//
// Note, this test checks a configuration where node A (relay) does not know about node B,
// and node B is configured to connect to A, and this scenario rejecting logic is guaranteed to work.
func TestP2PEnableGossipService_BothDisable(t *testing.T) {
partitiontest.PartitionTest(t)

log := logging.TestingLog(t)

// prepare configs
cfg := config.GetDefaultLocal()
cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses
cfg.EnableGossipService = false // disable gossip service by default

relayCfg := cfg
relayCfg.NetAddress = "127.0.0.1:0"

netA, err := NewP2PNetwork(log.With("net", "netA"), relayCfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)
netA.Start()
defer netA.Stop()

peerInfoA := netA.service.AddrInfo()
addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA)
require.NoError(t, err)
require.NotZero(t, addrsA[0])
multiAddrStr := addrsA[0].String()
phoneBookAddresses := []string{multiAddrStr}

nodeCfg := cfg
nodeCfg.NetAddress = ""

netB, err := NewP2PNetwork(log.With("net", "netB"), nodeCfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)
netB.Start()
defer netB.Stop()

require.Eventually(t, func() bool {
return len(netA.service.Conns()) > 0 && len(netB.service.Conns()) > 0
}, 1*time.Second, 50*time.Millisecond)

require.False(t, netA.hasPeers())
require.False(t, netB.hasPeers())
}

0 comments on commit 578684e

Please sign in to comment.