Skip to content

Commit

Permalink
polygon/sync: add bridge unwind logic for astrid (#12071)
Browse files Browse the repository at this point in the history
handles first part of #11533
which relates to calling bridge.Unwind via Astrid
  • Loading branch information
taratorio authored Sep 29, 2024
1 parent bf0d328 commit 40bb103
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 65 deletions.
67 changes: 22 additions & 45 deletions eth/stagedsync/stage_polygon_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func NewPolygonSyncStageCfg(
KeepSpanBlockProducerSelections: true,
KeepCheckpoints: true,
KeepMilestones: true,
// below are handled via the Bridge.Unwind logic in Astrid
KeepEventNums: true,
KeepEventProcessedBlocks: true,
}
if len(userUnwindTypeOverrides) > 0 {
unwindCfg.ApplyUserUnwindTypeOverrides(userUnwindTypeOverrides)
Expand Down Expand Up @@ -334,13 +337,13 @@ func UnwindHeimdall(tx kv.RwTx, u *UnwindState, unwindCfg HeimdallUnwindCfg) err
}

if !unwindCfg.KeepEventNums {
if err := UnwindEventNums(tx, u.UnwindPoint); err != nil {
if err := bridge.UnwindBlockNumToEventID(tx, u.UnwindPoint); err != nil {
return err
}
}

if !unwindCfg.KeepEventProcessedBlocks {
if err := UnwindEventProcessedBlocks(tx, u.UnwindPoint); err != nil {
if err := bridge.UnwindEventProcessedBlocks(tx, u.UnwindPoint); err != nil {
return err
}
}
Expand Down Expand Up @@ -419,44 +422,6 @@ func UnwindEvents(tx kv.RwTx, unwindPoint uint64) error {
return err
}

func UnwindEventNums(tx kv.RwTx, unwindPoint uint64) error {
c, err := tx.RwCursor(kv.BorEventNums)
if err != nil {
return err
}

defer c.Close()
var blockNumBuf [8]byte
binary.BigEndian.PutUint64(blockNumBuf[:], unwindPoint+1)
var k []byte
for k, _, err = c.Seek(blockNumBuf[:]); err == nil && k != nil; k, _, err = c.Next() {
if err = c.DeleteCurrent(); err != nil {
return err
}
}

return err
}

func UnwindEventProcessedBlocks(tx kv.RwTx, unwindPoint uint64) error {
c, err := tx.RwCursor(kv.BorEventProcessedBlocks)
if err != nil {
return err
}

defer c.Close()
var blockNumBuf [8]byte
binary.BigEndian.PutUint64(blockNumBuf[:], unwindPoint+1)
var k []byte
for k, _, err = c.Seek(blockNumBuf[:]); err == nil && k != nil; k, _, err = c.Next() {
if err = c.DeleteCurrent(); err != nil {
return err
}
}

return err
}

func UnwindSpans(tx kv.RwTx, unwindPoint uint64) error {
cursor, err := tx.RwCursor(kv.BorSpans)
if err != nil {
Expand Down Expand Up @@ -1319,6 +1284,23 @@ func (s polygonSyncStageBridgeStore) PutBlockNumToEventID(ctx context.Context, b
return r.err
}

// Unwind delete unwindable bridge data.
// The blockNum parameter is exclusive, i.e. only data in the range (blockNum, last] is deleted.
func (s polygonSyncStageBridgeStore) Unwind(ctx context.Context, blockNum uint64) error {
type response struct {
err error
}

r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, respond func(r response) error) error {
return respond(response{err: bridge.Unwind(tx, blockNum)})
})
if err != nil {
return err
}

return r.err
}

func (s polygonSyncStageBridgeStore) Events(context.Context, uint64, uint64) ([][]byte, error) {
// used for accessing events in execution
// astrid stage integration intends to use the bridge only for scrapping
Expand Down Expand Up @@ -1349,11 +1331,6 @@ func (s polygonSyncStageBridgeStore) PutEventTxnToBlockNum(context.Context, map[
return nil
}

func (s polygonSyncStageBridgeStore) PruneEventIDs(context.Context, uint64) error {
// at time of writing, pruning for Astrid stage loop integration is handled via the stage loop mechanisms
panic("polygonSyncStageBridgeStore.PruneEventIDs not supported")
}

func (s polygonSyncStageBridgeStore) Prepare(context.Context) error {
// no-op
return nil
Expand Down
30 changes: 26 additions & 4 deletions polygon/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
"sync/atomic"
"time"

liberrors "github.com/erigontech/erigon-lib/common/errors"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/log/v3"
bortypes "github.com/erigontech/erigon/polygon/bor/types"
"github.com/erigontech/erigon/polygon/polygoncommon"

libcommon "github.com/erigontech/erigon-lib/common"
liberrors "github.com/erigontech/erigon-lib/common/errors"
"github.com/erigontech/erigon/core/types"
"github.com/erigontech/erigon/polygon/bor/borcfg"
"github.com/erigontech/erigon/polygon/heimdall"
Expand Down Expand Up @@ -85,6 +85,7 @@ type Bridge struct {
processedBlocksSignal chan struct{}
lastProcessedBlockInfo atomic.Pointer[ProcessedBlockInfo]
synchronizeMu sync.Mutex
unwindMu sync.Mutex
}

func (b *Bridge) Run(ctx context.Context) error {
Expand Down Expand Up @@ -234,6 +235,9 @@ func (b *Bridge) ProcessNewBlocks(ctx context.Context, blocks []*types.Block) er
return nil
}

b.unwindMu.Lock()
defer b.unwindMu.Unlock()

lastProcessedEventID, err := b.store.LastProcessedEventID(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -357,10 +361,28 @@ func (b *Bridge) Synchronize(ctx context.Context, blockNum uint64) error {
return b.waitForProcessedBlock(ctx, blockNum)
}

// Unwind deletes map entries till tip
// Unwind delete unwindable bridge data.
// The blockNum parameter is exclusive, i.e. only data in the range (blockNum, last] is deleted.
func (b *Bridge) Unwind(ctx context.Context, blockNum uint64) error {
// TODO need to handle unwinds via astrid - will do in separate PR
return b.store.PruneEventIDs(ctx, blockNum)
b.logger.Debug(bridgeLogPrefix("unwinding"), "blockNum", blockNum)

b.unwindMu.Lock()
defer b.unwindMu.Unlock()

if err := b.store.Unwind(ctx, blockNum); err != nil {
return err
}

lastProcessedBlockInfo, ok, err := b.store.LastProcessedBlockInfo(ctx)
if err != nil {
return err
}
if !ok {
return errors.New("no last processed block info after unwind")
}

b.lastProcessedBlockInfo.Store(&lastProcessedBlockInfo)
return nil
}

// Events returns all sync events at blockNum
Expand Down
108 changes: 95 additions & 13 deletions polygon/bridge/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ type Store interface {
LastProcessedBlockInfo(ctx context.Context) (ProcessedBlockInfo, bool, error)
PutProcessedBlockInfo(ctx context.Context, info ProcessedBlockInfo) error
LastFrozenEventBlockNum() uint64
PruneEventIDs(ctx context.Context, blockNum uint64) error
// Unwind deletes unwindable bridge data.
// The blockNum parameter is exclusive, i.e. only data in the range (blockNum, last] is deleted.
Unwind(ctx context.Context, blockNum uint64) error
}

type MdbxStore struct {
Expand Down Expand Up @@ -442,35 +444,115 @@ func (s *MdbxStore) BlockEventIDsRange(ctx context.Context, blockNum uint64) (ui
return start, end, nil
}

func (s *MdbxStore) PruneEventIDs(ctx context.Context, blockNum uint64) error {
//
// TODO rename func to Unwind, unwind BorEventProcessedBlocks, BorTxnLookup - in separate PR
//

// Unwind deletes unwindable bridge data.
// The blockNum parameter is exclusive, i.e. only data in the range (blockNum, last] is deleted.
func (s *MdbxStore) Unwind(ctx context.Context, blockNum uint64) error {
tx, err := s.db.BeginRw(ctx)
if err != nil {
return err
}

defer tx.Rollback()
if err := Unwind(tx, blockNum); err != nil {
return err
}

kByte := make([]byte, 8)
binary.BigEndian.PutUint64(kByte, blockNum)
return tx.Commit()
}

cursor, err := tx.Cursor(kv.BorEventNums)
// Unwind deletes unwindable bridge data.
// The blockNum parameter is exclusive, i.e. only data in the range (blockNum, last] is deleted.
func Unwind(tx kv.RwTx, blockNum uint64) error {
if err := UnwindBlockNumToEventID(tx, blockNum); err != nil {
return err
}

if err := UnwindEventProcessedBlocks(tx, blockNum); err != nil {
return err
}

return UnwindEventTxnToBlockNum(tx, blockNum)
}

// UnwindEventProcessedBlocks deletes data in kv.BorEventProcessedBlocks.
// The blockNum parameter is exclusive, i.e. only data in the range (blockNum, last] is deleted.
func UnwindBlockNumToEventID(tx kv.RwTx, blockNum uint64) error {
c, err := tx.RwCursor(kv.BorEventNums)
if err != nil {
return err
}
defer cursor.Close()

defer c.Close()
var k []byte
for k, _, err = cursor.Seek(kByte); err == nil && k != nil; k, _, err = cursor.Next() {
if err := tx.Delete(kv.BorEventNums, k); err != nil {
for k, _, err = c.Last(); err == nil && k != nil; k, _, err = c.Prev() {
if currentBlockNum := binary.BigEndian.Uint64(k); currentBlockNum <= blockNum {
break
}

if err = c.DeleteCurrent(); err != nil {
return err
}
}

return err
}

// UnwindEventProcessedBlocks deletes data in kv.BorEventProcessedBlocks.
// The blockNum parameter is exclusive, i.e. only data in the range (blockNum, last] is deleted.
func UnwindEventProcessedBlocks(tx kv.RwTx, blockNum uint64) error {
c, err := tx.RwCursor(kv.BorEventProcessedBlocks)
if err != nil {
return err
}

return tx.Commit()
defer c.Close()
firstK, _, err := c.First()
if err != nil {
return err
}
if len(firstK) == 0 {
return errors.New("unexpected missing first processed block info entry when unwinding")
}
if first := binary.BigEndian.Uint64(firstK); blockNum < first {
// we always want to have at least 1 entry in the table
return fmt.Errorf("unwind blockNumber is too far back: first=%d, unwind=%d", first, blockNum)
}

var k []byte
for k, _, err = c.Last(); err == nil && k != nil; k, _, err = c.Prev() {
if currentBlockNum := binary.BigEndian.Uint64(k); currentBlockNum <= blockNum {
break
}

if err = c.DeleteCurrent(); err != nil {
return err
}
}

return err
}

// UnwindEventProcessedBlocks deletes data in kv.BorTxLookup.
// The blockNum parameter is exclusive, i.e. only data in the range (blockNum, last] is deleted.
func UnwindEventTxnToBlockNum(tx kv.RwTx, blockNum uint64) error {
c, err := tx.RwCursor(kv.BorTxLookup)
if err != nil {
return err
}

defer c.Close()
blockNumBytes := make([]byte, 8)
binary.BigEndian.PutUint64(blockNumBytes, blockNum)
var k, v []byte
for k, v, err = c.Last(); err == nil && k != nil; k, v, err = c.Prev() {
if currentBlockNum := binary.BigEndian.Uint64(v); currentBlockNum <= blockNum {
break
}

if err = c.DeleteCurrent(); err != nil {
return err
}
}

return err
}
16 changes: 13 additions & 3 deletions polygon/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type heimdallSynchronizer interface {

type bridgeSynchronizer interface {
Synchronize(ctx context.Context, blockNum uint64) error
Unwind(ctx context.Context, blockNum uint64) error
}

type Sync struct {
Expand Down Expand Up @@ -106,6 +107,7 @@ func (s *Sync) handleMilestoneTipMismatch(
) error {
// the milestone doesn't correspond to the tip of the chain
// unwind to the previous verified milestone
// and download the blocks of the new milestone
oldTip := ccBuilder.Root()
oldTipNum := oldTip.Number.Uint64()

Expand All @@ -118,7 +120,7 @@ func (s *Sync) handleMilestoneTipMismatch(
"milestoneRootHash", milestone.RootHash(),
)

if err := s.execution.UpdateForkChoice(ctx, oldTip, oldTip); err != nil {
if err := s.bridgeSync.Unwind(ctx, oldTipNum); err != nil {
return err
}

Expand All @@ -139,7 +141,6 @@ func (s *Sync) handleMilestoneTipMismatch(
}

ccBuilder.Reset(newTip)

return nil
}

Expand Down Expand Up @@ -246,13 +247,22 @@ func (s *Sync) applyNewBlockOnTip(
return nil
}

newTip := ccBuilder.Tip()
firstConnectedHeader := newConnectedHeaders[0]
if newTip != oldTip && oldTip.Hash() != firstConnectedHeader.ParentHash {
// forks have changed, we need to unwind unwindable data
blockNum := max(1, firstConnectedHeader.Number.Uint64()) - 1
if err := s.bridgeSync.Unwind(ctx, blockNum); err != nil {
return err
}
}

// len(newConnectedHeaders) is always <= len(blockChain)
newConnectedBlocks := blockChain[len(blockChain)-len(newConnectedHeaders):]
if err := s.store.InsertBlocks(ctx, newConnectedBlocks); err != nil {
return err
}

newTip := ccBuilder.Tip()
if newTip == oldTip {
return nil
}
Expand Down

0 comments on commit 40bb103

Please sign in to comment.