diff --git a/gossip/config.go b/gossip/config.go index 904083516..e89958bfb 100644 --- a/gossip/config.go +++ b/gossip/config.go @@ -49,6 +49,7 @@ type ( MaxRandomTxHashesSend int RandomTxHashesSendPeriod time.Duration PeerInfoCollectionPeriod time.Duration + PeerEndPointUpdatePeriod time.Duration PeerCache PeerCacheConfig } @@ -184,6 +185,7 @@ func DefaultConfig(scale cachescale.Func) Config { MaxRandomTxHashesSend: 250, // match softLimitItems to fit into one message RandomTxHashesSendPeriod: 1 * time.Second, PeerInfoCollectionPeriod: 3 * time.Second, + PeerEndPointUpdatePeriod: 1 * time.Minute, PeerCache: DefaultPeerCacheConfig(scale), }, diff --git a/gossip/handler.go b/gossip/handler.go index c0d5e43a1..1a550cb1b 100644 --- a/gossip/handler.go +++ b/gossip/handler.go @@ -88,6 +88,12 @@ type handlerConfig struct { s *Store process processCallback localId enode.ID + + localEndPointSource LocalEndPointSource +} + +type LocalEndPointSource interface { + GetLocalEndPoint() *enode.Node } type handler struct { @@ -145,6 +151,8 @@ type handler struct { connectionAdvisor topology.ConnectionAdvisor nextSuggestedPeer chan *enode.Node + localEndPointSource LocalEndPointSource + logger.Instance } @@ -174,6 +182,8 @@ func newHandler( connectionAdvisor: topology.NewConnectionAdvisor(c.localId), nextSuggestedPeer: make(chan *enode.Node, 1), + localEndPointSource: c.localEndPointSource, + Instance: logger.New("PM"), } h.started.Add(1) @@ -865,8 +875,12 @@ func (h *handler) handleMsg(p *peer) error { if peer.Useless() { continue } + info := peer.endPoint.Load() + if info == nil { + continue + } infos = append(infos, peerInfo{ - Enode: peer.Node().String(), + Enode: info.enode.String(), }) } err := p2p.Send(p.rw, PeerInfosMsg, peerInfoMsg{ @@ -894,6 +908,34 @@ func (h *handler) handleMsg(p *peer) error { h.connectionAdvisor.UpdatePeers(p.ID(), reportedPeers) + case msg.Code == GetEndPointMsg: + source := h.localEndPointSource + if source == nil { + return nil + } + enode := source.GetLocalEndPoint() + if enode == nil { + return nil + } + if err := p2p.Send(p.rw, EndPointUpdateMsg, enode.String()); err != nil { + return err + } + + case msg.Code == EndPointUpdateMsg: + var encoded string + if err := msg.Decode(&encoded); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + var enode enode.Node + if err := enode.UnmarshalText([]byte(encoded)); err != nil { + h.Log.Warn("Failed to unmarshal enode", "enode", encoded, "err", err) + } else { + p.endPoint.Store(&peerEndPointInfo{ + enode: enode, + timestamp: time.Now(), + }) + } + default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } @@ -1103,6 +1145,10 @@ func (h *handler) peerInfoCollectionLoop(stop <-chan struct{}) { // Request updated peer information from current peers. peers := h.peers.List() for _, peer := range peers { + // If we do not have the peer's end-point or it is too old, request it. + if info := peer.endPoint.Load(); info == nil || time.Since(info.timestamp) > h.config.Protocol.PeerEndPointUpdatePeriod { + peer.SendEndPointUpdateRequest() + } peer.SendPeerInfoRequest() } diff --git a/gossip/peer.go b/gossip/peer.go index 60346fec4..b508625ae 100644 --- a/gossip/peer.go +++ b/gossip/peer.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rlp" "github.com/Fantom-foundation/go-opera/gossip/protocols/dag/dagstream" @@ -71,6 +72,13 @@ type peer struct { useless uint32 sync.RWMutex + + endPoint atomic.Pointer[peerEndPointInfo] +} + +type peerEndPointInfo struct { + enode enode.Node + timestamp time.Time } func (p *peer) Useless() bool { @@ -542,12 +550,24 @@ func (p *peer) SendPeerInfoRequest() error { // If the peer doesn't support the peer info protocol, don't bother // sending the request. This request would lead to a disconnect // if the peer doesn't understand it. - if !p.Peer.RunningCap(ProtocolName, []uint{_Sonic_64}) { + if !p.Peer.RunningCap(ProtocolName, []uint{_Sonic_64, _Sonic_65}) { return nil } return p2p.Send(p.rw, GetPeerInfosMsg, struct{}{}) } +// SendEndPointUpdateRequest sends a request to the peer asking for the peer's +// public enode address to be used to establish a connection to this peer. +func (p *peer) SendEndPointUpdateRequest() error { + // If the peer doesn't support version 65 of this protocol, don't bother + // sending the request. This request would lead to a disconnect + // if the peer doesn't understand it. + if !p.Peer.RunningCap(ProtocolName, []uint{_Sonic_65}) { + return nil + } + return p2p.Send(p.rw, GetEndPointMsg, struct{}{}) +} + // String implements fmt.Stringer. func (p *peer) String() string { return fmt.Sprintf("Peer %s [%s]", p.id, diff --git a/gossip/protocol.go b/gossip/protocol.go index e1de940e0..145118da3 100644 --- a/gossip/protocol.go +++ b/gossip/protocol.go @@ -18,6 +18,7 @@ import ( const ( _FTM62 = 62 _Sonic_64 = 64 + _Sonic_65 = 65 ) // ProtocolName is the official short name of the protocol used during capability negotiation. @@ -25,12 +26,14 @@ const ProtocolName = "opera" // ProtocolVersions are the supported versions of the protocol (first is primary). var ProtocolVersions = []uint{ + _Sonic_65, _Sonic_64, _FTM62, } // protocolLengths are the number of implemented message corresponding to different protocol versions. var protocolLengths = map[uint]uint64{ + _Sonic_65: EndPointUpdateMsg + 1, _Sonic_64: PeerInfosMsg + 1, _FTM62: EventsStreamResponse + 1, } @@ -70,6 +73,11 @@ const ( GetPeerInfosMsg = 10 // Contains the list of known peers and their information. PeerInfosMsg = 11 + + // Request the enode of the peer identifying its public end-point. + GetEndPointMsg = 12 + // Contains the enode including the public end-point of the sender. + EndPointUpdateMsg = 13 ) type errCode int diff --git a/gossip/service.go b/gossip/service.go index ed51bb0f9..4c1a023b5 100644 --- a/gossip/service.go +++ b/gossip/service.go @@ -255,6 +255,7 @@ func newService(config Config, store *Store, blockProc BlockProc, engine lachesi }, SwitchEpochTo: svc.SwitchEpochTo, }, + localEndPointSource: localEndPointSource{svc}, }) if err != nil { return nil, err @@ -271,6 +272,14 @@ func newService(config Config, store *Store, blockProc BlockProc, engine lachesi return svc, nil } +type localEndPointSource struct { + service *Service +} + +func (s localEndPointSource) GetLocalEndPoint() *enode.Node { + return s.service.p2pServer.LocalNode().Node() +} + // makeCheckers builds event checkers func makeCheckers(heavyCheckCfg heavycheck.Config, txSigner types.Signer, heavyCheckReader *HeavyCheckReader, gasPowerCheckReader *GasPowerCheckReader, store *Store) *eventcheck.Checkers { // create signatures checker diff --git a/version/version.go b/version/version.go index e066e78e0..66f52e41b 100644 --- a/version/version.go +++ b/version/version.go @@ -7,7 +7,7 @@ import ( const ( VersionMajor = 2 // Major version component of the current release VersionMinor = 0 // Minor version component of the current release - VersionPatch = 0 // Patch version component of the current release + VersionPatch = 1 // Patch version component of the current release VersionMeta = "" // Version metadata to append to the version string )