Skip to content

Commit

Permalink
Merge networking peer address updates fix (#368)
Browse files Browse the repository at this point in the history
* Propagate server-end-point address information in peer discovery (#366)

* Add support for exchanging public enode endpoints

* Fix integration of end-point request protocol

* Fix naming of fields and functions

* Fix the version of the client (#367)

---------

Co-authored-by: Herbert <[email protected]>
  • Loading branch information
jmpike and HerbertJordan authored Dec 2, 2024
1 parent 0b774c3 commit 1933d89
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 3 deletions.
2 changes: 2 additions & 0 deletions gossip/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type (
MaxRandomTxHashesSend int
RandomTxHashesSendPeriod time.Duration
PeerInfoCollectionPeriod time.Duration
PeerEndPointUpdatePeriod time.Duration

PeerCache PeerCacheConfig
}
Expand Down Expand Up @@ -184,6 +185,7 @@ func DefaultConfig(scale cachescale.Func) Config {
MaxRandomTxHashesSend: 250, // match softLimitItems to fit into one message
RandomTxHashesSendPeriod: 1 * time.Second,
PeerInfoCollectionPeriod: 3 * time.Second,
PeerEndPointUpdatePeriod: 1 * time.Minute,
PeerCache: DefaultPeerCacheConfig(scale),
},

Expand Down
48 changes: 47 additions & 1 deletion gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ type handlerConfig struct {
s *Store
process processCallback
localId enode.ID

localEndPointSource LocalEndPointSource
}

type LocalEndPointSource interface {
GetLocalEndPoint() *enode.Node
}

type handler struct {
Expand Down Expand Up @@ -145,6 +151,8 @@ type handler struct {
connectionAdvisor topology.ConnectionAdvisor
nextSuggestedPeer chan *enode.Node

localEndPointSource LocalEndPointSource

logger.Instance
}

Expand Down Expand Up @@ -174,6 +182,8 @@ func newHandler(
connectionAdvisor: topology.NewConnectionAdvisor(c.localId),
nextSuggestedPeer: make(chan *enode.Node, 1),

localEndPointSource: c.localEndPointSource,

Instance: logger.New("PM"),
}
h.started.Add(1)
Expand Down Expand Up @@ -865,8 +875,12 @@ func (h *handler) handleMsg(p *peer) error {
if peer.Useless() {
continue
}
info := peer.endPoint.Load()
if info == nil {
continue
}
infos = append(infos, peerInfo{
Enode: peer.Node().String(),
Enode: info.enode.String(),
})
}
err := p2p.Send(p.rw, PeerInfosMsg, peerInfoMsg{
Expand Down Expand Up @@ -894,6 +908,34 @@ func (h *handler) handleMsg(p *peer) error {

h.connectionAdvisor.UpdatePeers(p.ID(), reportedPeers)

case msg.Code == GetEndPointMsg:
source := h.localEndPointSource
if source == nil {
return nil
}
enode := source.GetLocalEndPoint()
if enode == nil {
return nil
}
if err := p2p.Send(p.rw, EndPointUpdateMsg, enode.String()); err != nil {
return err
}

case msg.Code == EndPointUpdateMsg:
var encoded string
if err := msg.Decode(&encoded); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
var enode enode.Node
if err := enode.UnmarshalText([]byte(encoded)); err != nil {
h.Log.Warn("Failed to unmarshal enode", "enode", encoded, "err", err)
} else {
p.endPoint.Store(&peerEndPointInfo{
enode: enode,
timestamp: time.Now(),
})
}

default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
Expand Down Expand Up @@ -1103,6 +1145,10 @@ func (h *handler) peerInfoCollectionLoop(stop <-chan struct{}) {
// Request updated peer information from current peers.
peers := h.peers.List()
for _, peer := range peers {
// If we do not have the peer's end-point or it is too old, request it.
if info := peer.endPoint.Load(); info == nil || time.Since(info.timestamp) > h.config.Protocol.PeerEndPointUpdatePeriod {
peer.SendEndPointUpdateRequest()
}
peer.SendPeerInfoRequest()
}

Expand Down
22 changes: 21 additions & 1 deletion gossip/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"

"github.com/Fantom-foundation/go-opera/gossip/protocols/dag/dagstream"
Expand Down Expand Up @@ -71,6 +72,13 @@ type peer struct {
useless uint32

sync.RWMutex

endPoint atomic.Pointer[peerEndPointInfo]
}

type peerEndPointInfo struct {
enode enode.Node
timestamp time.Time
}

func (p *peer) Useless() bool {
Expand Down Expand Up @@ -542,12 +550,24 @@ func (p *peer) SendPeerInfoRequest() error {
// If the peer doesn't support the peer info protocol, don't bother
// sending the request. This request would lead to a disconnect
// if the peer doesn't understand it.
if !p.Peer.RunningCap(ProtocolName, []uint{_Sonic_64}) {
if !p.Peer.RunningCap(ProtocolName, []uint{_Sonic_64, _Sonic_65}) {
return nil
}
return p2p.Send(p.rw, GetPeerInfosMsg, struct{}{})
}

// SendEndPointUpdateRequest sends a request to the peer asking for the peer's
// public enode address to be used to establish a connection to this peer.
func (p *peer) SendEndPointUpdateRequest() error {
// If the peer doesn't support version 65 of this protocol, don't bother
// sending the request. This request would lead to a disconnect
// if the peer doesn't understand it.
if !p.Peer.RunningCap(ProtocolName, []uint{_Sonic_65}) {
return nil
}
return p2p.Send(p.rw, GetEndPointMsg, struct{}{})
}

// String implements fmt.Stringer.
func (p *peer) String() string {
return fmt.Sprintf("Peer %s [%s]", p.id,
Expand Down
8 changes: 8 additions & 0 deletions gossip/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@ import (
const (
_FTM62 = 62
_Sonic_64 = 64
_Sonic_65 = 65
)

// ProtocolName is the official short name of the protocol used during capability negotiation.
const ProtocolName = "opera"

// ProtocolVersions are the supported versions of the protocol (first is primary).
var ProtocolVersions = []uint{
_Sonic_65,
_Sonic_64,
_FTM62,
}

// protocolLengths are the number of implemented message corresponding to different protocol versions.
var protocolLengths = map[uint]uint64{
_Sonic_65: EndPointUpdateMsg + 1,
_Sonic_64: PeerInfosMsg + 1,
_FTM62: EventsStreamResponse + 1,
}
Expand Down Expand Up @@ -70,6 +73,11 @@ const (
GetPeerInfosMsg = 10
// Contains the list of known peers and their information.
PeerInfosMsg = 11

// Request the enode of the peer identifying its public end-point.
GetEndPointMsg = 12
// Contains the enode including the public end-point of the sender.
EndPointUpdateMsg = 13
)

type errCode int
Expand Down
9 changes: 9 additions & 0 deletions gossip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func newService(config Config, store *Store, blockProc BlockProc, engine lachesi
},
SwitchEpochTo: svc.SwitchEpochTo,
},
localEndPointSource: localEndPointSource{svc},
})
if err != nil {
return nil, err
Expand All @@ -271,6 +272,14 @@ func newService(config Config, store *Store, blockProc BlockProc, engine lachesi
return svc, nil
}

type localEndPointSource struct {
service *Service
}

func (s localEndPointSource) GetLocalEndPoint() *enode.Node {
return s.service.p2pServer.LocalNode().Node()
}

// makeCheckers builds event checkers
func makeCheckers(heavyCheckCfg heavycheck.Config, txSigner types.Signer, heavyCheckReader *HeavyCheckReader, gasPowerCheckReader *GasPowerCheckReader, store *Store) *eventcheck.Checkers {
// create signatures checker
Expand Down
2 changes: 1 addition & 1 deletion version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
const (
VersionMajor = 2 // Major version component of the current release
VersionMinor = 0 // Minor version component of the current release
VersionPatch = 0 // Patch version component of the current release
VersionPatch = 1 // Patch version component of the current release
VersionMeta = "" // Version metadata to append to the version string
)

Expand Down

0 comments on commit 1933d89

Please sign in to comment.