Skip to content

Commit

Permalink
polygon/sync: notify about latest waypoint end block on initial sync (#…
Browse files Browse the repository at this point in the history
…12712)

`eth_syncing` API used to flicker between syncing and not syncing during
initial Astrid sync. This is due to how the initial sync algorithm works
and the new last new block seen notification logic.

E.g.
![Screenshot 2024-11-12 at 14 52
48](https://github.com/user-attachments/assets/76f648d1-f478-4502-97ae-827e4afd680d)


This PR fixes this.
  • Loading branch information
taratorio authored Nov 13, 2024
1 parent 7e05c5d commit 747c254
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 48 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ func stagePolygonSync(db kv.RwDB, ctx context.Context, logger log.Logger) error

stageState := stage(stageSync, tx, nil, stages.PolygonSync)
cfg := stagedsync.NewPolygonSyncStageCfg(logger, chainConfig, nil, heimdallClient,
heimdallStore, bridgeStore, nil, 0, nil, blockReader, nil, 0, unwindTypes)
heimdallStore, bridgeStore, nil, 0, nil, blockReader, nil, 0, unwindTypes, nil /* notifications */)
// we only need blockReader and blockWriter (blockWriter is constructed in NewPolygonSyncStageCfg)
if unwind > 0 {
u := stageSync.NewUnwindState(stageState.ID, stageState.BlockNumber-unwind, stageState.BlockNumber, true, false)
Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
config.LoopBlockLimit,
polygonBridge,
heimdallService,
backend.notifications,
)

// we need to initiate download before the heimdall services start rather than
Expand Down
7 changes: 5 additions & 2 deletions eth/stagedsync/stage_polygon_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
polygonsync "github.com/erigontech/erigon/polygon/sync"
"github.com/erigontech/erigon/rlp"
"github.com/erigontech/erigon/turbo/services"
"github.com/erigontech/erigon/turbo/shards"
)

var errBreakPolygonSyncStage = errors.New("break polygon sync stage")
Expand All @@ -64,6 +65,7 @@ func NewPolygonSyncStageCfg(
stopNode func() error,
blockLimit uint,
userUnwindTypeOverrides []string,
notifications *shards.Notifications,
) PolygonSyncStageCfg {
// using a buffered channel to preserve order of tx actions,
// do not expect to ever have more than 50 goroutines blocking on this channel
Expand Down Expand Up @@ -109,7 +111,7 @@ func NewPolygonSyncStageCfg(
Logger: logger,
BorConfig: borConfig,
EventFetcher: heimdallClient})
p2pService := p2p.NewService(maxPeers, logger, sentry, statusDataProvider.GetStatusData)
p2pService := p2p.NewService(logger, maxPeers, sentry, statusDataProvider.GetStatusData)
checkpointVerifier := polygonsync.VerifyCheckpointHeaders
milestoneVerifier := polygonsync.VerifyMilestoneHeaders
blocksVerifier := polygonsync.VerifyBlocks
Expand All @@ -126,6 +128,7 @@ func NewPolygonSyncStageCfg(
)
events := polygonsync.NewTipEvents(logger, p2pService, heimdallService)
sync := polygonsync.NewSync(
logger,
syncStore,
executionEngine,
milestoneVerifier,
Expand All @@ -136,7 +139,7 @@ func NewPolygonSyncStageCfg(
heimdallService,
bridgeService,
events.Events(),
logger,
notifications,
)
syncService := &polygonSyncStageService{
logger: logger,
Expand Down
2 changes: 1 addition & 1 deletion polygon/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/erigontech/erigon/polygon/polygoncommon"
)

func NewService(maxPeers int, logger log.Logger, sc sentryproto.SentryClient, sdf sentry.StatusDataFactory) *Service {
func NewService(logger log.Logger, maxPeers int, sc sentryproto.SentryClient, sdf sentry.StatusDataFactory) *Service {
peerPenalizer := NewPeerPenalizer(sc)
messageListener := NewMessageListener(logger, sc, sdf, peerPenalizer)
peerTracker := NewPeerTracker(logger, sc, messageListener)
Expand Down
7 changes: 5 additions & 2 deletions polygon/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/erigontech/erigon/polygon/bridge"
"github.com/erigontech/erigon/polygon/heimdall"
"github.com/erigontech/erigon/polygon/p2p"
"github.com/erigontech/erigon/turbo/shards"
)

func NewService(
Expand All @@ -43,12 +44,13 @@ func NewService(
blockLimit uint,
bridgeService *bridge.Service,
heimdallService *heimdall.Service,
notifications *shards.Notifications,
) *Service {
borConfig := chainConfig.Bor.(*borcfg.BorConfig)
checkpointVerifier := VerifyCheckpointHeaders
milestoneVerifier := VerifyMilestoneHeaders
blocksVerifier := VerifyBlocks
p2pService := p2p.NewService(maxPeers, logger, sentryClient, statusDataProvider.GetStatusData)
p2pService := p2p.NewService(logger, maxPeers, sentryClient, statusDataProvider.GetStatusData)
execution := newExecutionClient(executionClient)
store := NewStore(logger, execution, bridgeService)
blockDownloader := NewBlockDownloader(
Expand All @@ -64,6 +66,7 @@ func NewService(
ccBuilderFactory := NewCanonicalChainBuilderFactory(chainConfig, borConfig, heimdallService)
events := NewTipEvents(logger, p2pService, heimdallService)
sync := NewSync(
logger,
store,
execution,
milestoneVerifier,
Expand All @@ -74,7 +77,7 @@ func NewService(
heimdallService,
bridgeService,
events.Events(),
logger,
notifications,
)
return &Service{
sync: sync,
Expand Down
75 changes: 34 additions & 41 deletions polygon/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/erigontech/erigon/core/types"
"github.com/erigontech/erigon/polygon/heimdall"
"github.com/erigontech/erigon/polygon/p2p"
"github.com/erigontech/erigon/turbo/shards"
)

type heimdallSynchronizer interface {
Expand All @@ -46,6 +47,7 @@ type bridgeSynchronizer interface {
}

func NewSync(
logger log.Logger,
store Store,
execution ExecutionClient,
milestoneVerifier WaypointHeadersVerifier,
Expand All @@ -56,14 +58,15 @@ func NewSync(
heimdallSync heimdallSynchronizer,
bridgeSync bridgeSynchronizer,
events <-chan Event,
logger log.Logger,
notifications *shards.Notifications,
) *Sync {
badBlocksLru, err := simplelru.NewLRU[common.Hash, struct{}](1024, nil)
if err != nil {
panic(err)
}

return &Sync{
logger: logger,
store: store,
execution: execution,
milestoneVerifier: milestoneVerifier,
Expand All @@ -75,11 +78,12 @@ func NewSync(
bridgeSync: bridgeSync,
events: events,
badBlocks: badBlocksLru,
logger: logger,
notifications: notifications,
}
}

type Sync struct {
logger log.Logger
store Store
execution ExecutionClient
milestoneVerifier WaypointHeadersVerifier
Expand All @@ -91,7 +95,7 @@ type Sync struct {
bridgeSync bridgeSynchronizer
events <-chan Event
badBlocks *simplelru.LRU[common.Hash, struct{}]
logger log.Logger
notifications *shards.Notifications
}

func (s *Sync) commitExecution(ctx context.Context, newTip *types.Header, finalizedHeader *types.Header) error {
Expand Down Expand Up @@ -739,61 +743,49 @@ func (s *Sync) syncToTip(ctx context.Context) (syncToTipResult, error) {
}

func (s *Sync) syncToTipUsingCheckpoints(ctx context.Context, tip *types.Header) (syncToTipResult, error) {
return s.sync(ctx, tip, func(ctx context.Context, startBlockNum uint64) (syncToTipResult, error) {
latestCheckpoint, err := s.heimdallSync.SynchronizeCheckpoints(ctx)
if err != nil {
return syncToTipResult{}, err
}

tip, err := s.blockDownloader.DownloadBlocksUsingCheckpoints(ctx, startBlockNum)
if err != nil {
return syncToTipResult{}, err
}

return syncToTipResult{latestTip: tip, latestWaypoint: latestCheckpoint}, nil
})
syncCheckpoints := func(ctx context.Context) (heimdall.Waypoint, error) {
return s.heimdallSync.SynchronizeCheckpoints(ctx)
}
return s.sync(ctx, tip, syncCheckpoints, s.blockDownloader.DownloadBlocksUsingCheckpoints)
}

func (s *Sync) syncToTipUsingMilestones(ctx context.Context, tip *types.Header) (syncToTipResult, error) {
return s.sync(ctx, tip, func(ctx context.Context, startBlockNum uint64) (syncToTipResult, error) {
latestMilestone, err := s.heimdallSync.SynchronizeMilestones(ctx)
if err != nil {
return syncToTipResult{}, err
}

tip, err := s.blockDownloader.DownloadBlocksUsingMilestones(ctx, startBlockNum)
if err != nil {
return syncToTipResult{}, err
}

return syncToTipResult{latestTip: tip, latestWaypoint: latestMilestone}, nil
})
syncMilestones := func(ctx context.Context) (heimdall.Waypoint, error) {
return s.heimdallSync.SynchronizeMilestones(ctx)
}
return s.sync(ctx, tip, syncMilestones, s.blockDownloader.DownloadBlocksUsingMilestones)
}

type tipDownloaderFunc func(ctx context.Context, startBlockNum uint64) (syncToTipResult, error)
type waypointSyncFunc func(ctx context.Context) (heimdall.Waypoint, error)
type blockDownloadFunc func(ctx context.Context, startBlockNum uint64) (*types.Header, error)

func (s *Sync) sync(ctx context.Context, tip *types.Header, tipDownloader tipDownloaderFunc) (syncToTipResult, error) {
var latestWaypoint heimdall.Waypoint
var startBlockNum uint64 = 1
func (s *Sync) sync(
ctx context.Context,
tip *types.Header,
waypointSync waypointSyncFunc,
blockDownload blockDownloadFunc,
) (syncToTipResult, error) {
var waypoint heimdall.Waypoint

for {
if tip != nil {
startBlockNum = tip.Number.Uint64() + 1
newWaypoint, err := waypointSync(ctx)
if err != nil {
return syncToTipResult{}, err
}

newResult, err := tipDownloader(ctx, startBlockNum)
// notify about latest waypoint end block so that eth_syncing API doesn't flicker on initial sync
s.notifications.NewLastBlockSeen(newWaypoint.EndBlock().Uint64())

newTip, err := blockDownload(ctx, tip.Number.Uint64()+1)
if err != nil {
return syncToTipResult{}, err
}

latestWaypoint = newResult.latestWaypoint

if newResult.latestTip == nil {
if newTip == nil {
// we've reached the tip
break
}

newTip := newResult.latestTip
if err := s.commitExecution(ctx, newTip, newTip); err != nil {
// note: if we face a failure during execution of finalized waypoints blocks, it means that
// we're wrong and the blocks are not considered as bad blocks, so we should terminate
Expand All @@ -802,9 +794,10 @@ func (s *Sync) sync(ctx context.Context, tip *types.Header, tipDownloader tipDow
}

tip = newTip
waypoint = newWaypoint
}

return syncToTipResult{latestTip: tip, latestWaypoint: latestWaypoint}, nil
return syncToTipResult{latestTip: tip, latestWaypoint: waypoint}, nil
}

func (s *Sync) handleWaypointExecutionErr(ctx context.Context, lastCorrectTip *types.Header, execErr error) error {
Expand Down
4 changes: 4 additions & 0 deletions turbo/shards/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ type Notifications struct {
LastNewBlockSeen atomic.Uint64 // This is used by eth_syncing as an heuristic to determine if the node is syncing or not.
}

func (n *Notifications) NewLastBlockSeen(blockNum uint64) {
n.LastNewBlockSeen.Store(blockNum)
}

func NewNotifications(StateChangesConsumer StateChangeConsumer) *Notifications {
return &Notifications{
Events: NewEvents(),
Expand Down
3 changes: 2 additions & 1 deletion turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (h *Hook) LastNewBlockSeen(n uint64) {
if h == nil || h.notifications == nil {
return
}
h.notifications.LastNewBlockSeen.Store(n)
h.notifications.NewLastBlockSeen(n)
}
func (h *Hook) BeforeRun(tx kv.Tx, inSync bool) error {
if h == nil {
Expand Down Expand Up @@ -819,6 +819,7 @@ func NewPolygonSyncStages(
stopNode,
config.LoopBlockLimit,
nil, /* userUnwindTypeOverrides */
notifications,
),
stagedsync.StageSendersCfg(db, chainConfig, config.Sync, false, config.Dirs.Tmp, config.Prune, blockReader, nil),
stagedsync.StageExecuteBlocksCfg(db, config.Prune, config.BatchSize, chainConfig, consensusEngine, &vm.Config{}, notifications, config.StateStream, false, false, config.ChaosMonkey, config.Dirs, blockReader, nil, config.Genesis, config.Sync, SilkwormForExecutionStage(silkworm, config)),
Expand Down

0 comments on commit 747c254

Please sign in to comment.