diff --git a/bridgesync/bridgesync.go b/bridgesync/bridgesync.go index b3c3c853..f07641c5 100644 --- a/bridgesync/bridgesync.go +++ b/bridgesync/bridgesync.go @@ -171,30 +171,51 @@ func (s *BridgeSync) Start(ctx context.Context) { } func (s *BridgeSync) GetLastProcessedBlock(ctx context.Context) (uint64, error) { + if s.processor.isHalted() { + return 0, sync.ErrInconsistentState + } return s.processor.GetLastProcessedBlock(ctx) } func (s *BridgeSync) GetBridgeRootByHash(ctx context.Context, root common.Hash) (*tree.Root, error) { + if s.processor.isHalted() { + return nil, sync.ErrInconsistentState + } return s.processor.exitTree.GetRootByHash(ctx, root) } func (s *BridgeSync) GetClaims(ctx context.Context, fromBlock, toBlock uint64) ([]Claim, error) { + if s.processor.isHalted() { + return nil, sync.ErrInconsistentState + } return s.processor.GetClaims(ctx, fromBlock, toBlock) } func (s *BridgeSync) GetBridges(ctx context.Context, fromBlock, toBlock uint64) ([]Bridge, error) { + if s.processor.isHalted() { + return nil, sync.ErrInconsistentState + } return s.processor.GetBridges(ctx, fromBlock, toBlock) } func (s *BridgeSync) GetBridgesPublished(ctx context.Context, fromBlock, toBlock uint64) ([]Bridge, error) { + if s.processor.isHalted() { + return nil, sync.ErrInconsistentState + } return s.processor.GetBridgesPublished(ctx, fromBlock, toBlock) } func (s *BridgeSync) GetProof(ctx context.Context, depositCount uint32, localExitRoot common.Hash) (tree.Proof, error) { + if s.processor.isHalted() { + return tree.Proof{}, sync.ErrInconsistentState + } return s.processor.exitTree.GetProof(ctx, depositCount, localExitRoot) } func (s *BridgeSync) GetBlockByLER(ctx context.Context, ler common.Hash) (uint64, error) { + if s.processor.isHalted() { + return 0, sync.ErrInconsistentState + } root, err := s.processor.exitTree.GetRootByHash(ctx, ler) if err != nil { return 0, err @@ -203,6 +224,9 @@ func (s *BridgeSync) GetBlockByLER(ctx context.Context, ler common.Hash) (uint64 } func (s *BridgeSync) GetRootByLER(ctx context.Context, ler common.Hash) (*tree.Root, error) { + if s.processor.isHalted() { + return nil, sync.ErrInconsistentState + } root, err := s.processor.exitTree.GetRootByHash(ctx, ler) if err != nil { return root, err @@ -212,6 +236,9 @@ func (s *BridgeSync) GetRootByLER(ctx context.Context, ler common.Hash) (*tree.R // GetExitRootByIndex returns the root of the exit tree at the moment the leaf with the given index was added func (s *BridgeSync) GetExitRootByIndex(ctx context.Context, index uint32) (tree.Root, error) { + if s.processor.isHalted() { + return tree.Root{}, sync.ErrInconsistentState + } return s.processor.exitTree.GetRootByIndex(ctx, index) } diff --git a/bridgesync/e2e_test.go b/bridgesync/e2e_test.go index 8072e8f5..9ac56a7d 100644 --- a/bridgesync/e2e_test.go +++ b/bridgesync/e2e_test.go @@ -16,10 +16,10 @@ import ( func TestBridgeEventE2E(t *testing.T) { const ( - totalBridges = 100 - totalReorgs = 10 + totalBridges = 1000 + totalReorgs = 700 maxReorgDepth = 5 - reorgEveryXIterations = 10 + reorgEveryXIterations = 7 // every X blocks go back [1,maxReorgDepth] blocks ) env := helpers.NewE2EEnvWithEVML2(t) ctx := context.Background() @@ -48,7 +48,7 @@ func TestBridgeEventE2E(t *testing.T) { true, nil, ) require.NoError(t, err) - helpers.CommitBlocks(t, env.L1Client, 1, time.Millisecond*100) + helpers.CommitBlocks(t, env.L1Client, 1, time.Millisecond) bn, err := env.L1Client.Client().BlockNumber(ctx) require.NoError(t, err) bridge.BlockNum = bn @@ -89,7 +89,7 @@ func TestBridgeEventE2E(t *testing.T) { } // Wait for syncer to catch up - time.Sleep(time.Second) // sleeping since the processor could be up to date, but have pending reorgs + time.Sleep(time.Second * 2) // sleeping since the processor could be up to date, but have pending reorgs lb, err := env.L1Client.Client().BlockNumber(ctx) require.NoError(t, err) helpers.RequireProcessorUpdated(t, env.BridgeL1Sync, lb) diff --git a/bridgesync/processor.go b/bridgesync/processor.go index d1eacefe..50bcbc42 100644 --- a/bridgesync/processor.go +++ b/bridgesync/processor.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "math/big" + mutex "sync" "github.com/0xPolygon/cdk/bridgesync/migrations" "github.com/0xPolygon/cdk/db" @@ -99,9 +100,12 @@ type Event struct { } type processor struct { - db *sql.DB - exitTree *tree.AppendOnlyTree - log *log.Logger + db *sql.DB + exitTree *tree.AppendOnlyTree + log *log.Logger + mu mutex.RWMutex + halted bool + haltedReason string } func newProcessor(dbPath, loggerPrefix string) (*processor, error) { @@ -243,7 +247,11 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error { } }() - _, err = tx.Exec(`DELETE FROM block WHERE num >= $1;`, firstReorgedBlock) + res, err := tx.Exec(`DELETE FROM block WHERE num >= $1;`, firstReorgedBlock) + if err != nil { + return err + } + rowsAffected, err := res.RowsAffected() if err != nil { return err } @@ -254,6 +262,12 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error { if err := tx.Commit(); err != nil { return err } + if rowsAffected > 0 { + p.mu.Lock() + defer p.mu.Unlock() + p.halted = false + p.haltedReason = "" + } log.Debug("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx BRIDGE SYNC xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx>") return nil @@ -262,6 +276,10 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error { // ProcessBlock process the events of the block to build the exit tree // and updates the last processed block (can be called without events for that purpose) func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error { + if p.isHalted() { + log.Errorf("processor is halted due to: %s", p.haltedReason) + return sync.ErrInconsistentState + } tx, err := db.NewTx(ctx, p.db) if err != nil { return err @@ -289,7 +307,13 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error { Index: event.Bridge.DepositCount, Hash: event.Bridge.Hash(), }); err != nil { - return err + if errors.Is(err, tree.ErrInvalidIndex) { + p.mu.Lock() + p.halted = true + p.haltedReason = fmt.Sprintf("error adding leaf to the exit tree: %v", err) + p.mu.Unlock() + } + return sync.ErrInconsistentState } if err = meddler.Insert(tx, "bridge", event.Bridge); err != nil { return err @@ -377,3 +401,9 @@ func DecodeGlobalIndex(globalIndex *big.Int) (mainnetFlag bool, func convertBytesToUint32(bytes []byte) uint32 { return uint32(big.NewInt(0).SetBytes(bytes).Uint64()) } + +func (p *processor) isHalted() bool { + p.mu.RLock() + defer p.mu.RUnlock() + return p.halted +} diff --git a/db/sqlite.go b/db/sqlite.go index df0c1d28..d12cd094 100644 --- a/db/sqlite.go +++ b/db/sqlite.go @@ -3,6 +3,7 @@ package db import ( "database/sql" "errors" + "fmt" _ "github.com/mattn/go-sqlite3" ) @@ -17,17 +18,18 @@ var ( // NewSQLiteDB creates a new SQLite DB func NewSQLiteDB(dbPath string) (*sql.DB, error) { - db, err := sql.Open("sqlite3", dbPath) - if err != nil { - return nil, err - } - _, err = db.Exec(` - PRAGMA foreign_keys = ON; - pragma journal_mode = WAL; - pragma synchronous = normal; - pragma journal_size_limit = 6144000; - `) - return db, err + return sql.Open("sqlite3", fmt.Sprintf("file:%s?_txlock=exclusive&_foreign_keys=on", dbPath)) + // db,err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_foreign_keys=on", dbPath)) + // if err != nil { + // return nil, err + // } + // _, err = db.Exec(` + // PRAGMA foreign_keys = ON; + // pragma journal_mode = WAL; + // pragma synchronous = normal; + // pragma journal_size_limit = 6144000; + // `) + // return db, err } func ReturnErrNotFound(err error) error { diff --git a/l1infotreesync/cascade_test.go b/l1infotreesync/cascade_test.go index ca980161..61e0ad11 100644 --- a/l1infotreesync/cascade_test.go +++ b/l1infotreesync/cascade_test.go @@ -13,11 +13,11 @@ import ( ) func TestCascade(t *testing.T) { - dbPath := path.Join(t.TempDir(), "file::memory:?cache=shared") + dbPath := path.Join(t.TempDir(), "TestCascade.sqlite") p, err := newProcessor(dbPath) require.NoError(t, err) ctx := context.Background() - for i := 1; i < 10000; i++ { + for i := 1; i < 10_000; i++ { // insert block and info tx, err := db.NewTx(ctx, p.db) require.NoError(t, err) @@ -42,13 +42,13 @@ func TestCascade(t *testing.T) { if i%3 == 0 { tx, err = db.NewTx(ctx, p.db) require.NoError(t, err) - _, err = tx.Exec(`delete from block where num >= $1;`, i-1) + _, err = tx.Exec(`delete from block where num >= $1;`, i) require.NoError(t, err) require.NoError(t, tx.Commit()) // assert that info table is empty info, err = p.GetLastInfo() require.NoError(t, err) - require.Equal(t, info.BlockNumber, uint64(i-2)) + require.Equal(t, info.BlockNumber, uint64(i-1)) } } } diff --git a/l1infotreesync/processor.go b/l1infotreesync/processor.go index dad3b217..480fc806 100644 --- a/l1infotreesync/processor.go +++ b/l1infotreesync/processor.go @@ -251,12 +251,19 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error { } }() + info := &L1InfoTreeLeaf{} + err = meddler.QueryRow(tx, info, ` + SELECT * FROM l1info_leaf + ORDER BY block_num DESC, block_pos DESC + LIMIT 1; + `) + log.Debugf("info before delete: %d", info.BlockNumber) log.Debug("xxxxxxxxxxxx going to delete") res, err := tx.Exec(`DELETE FROM block WHERE num >= $1;`, firstReorgedBlock) if err != nil { return err } - info := &L1InfoTreeLeaf{} + info = &L1InfoTreeLeaf{} err = meddler.QueryRow(tx, info, ` SELECT * FROM l1info_leaf ORDER BY block_num DESC, block_pos DESC @@ -266,7 +273,7 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error { panic("nope " + err.Error()) } if info.BlockNumber >= firstReorgedBlock { - log.Fatal("on the tx 1 !!!!! unsuccessful reorg, l1 info table has invalid info") + log.Fatalf("on the tx: info block num before reorg: %d") } log.Debug("done deleting xxxxxxxxxxxx") diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index 91d21354..0b0a4025 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -141,6 +141,7 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error { headers := hdrs.getSorted() for _, hdr := range headers { // Get the actual header from the network or from the cache + var err error headersCacheLock.Lock() currentHeader, ok := headersCache[hdr.Num] if !ok || currentHeader == nil { diff --git a/sync/evmdownloader.go b/sync/evmdownloader.go index 74b8ec7c..f0d49282 100644 --- a/sync/evmdownloader.go +++ b/sync/evmdownloader.go @@ -14,6 +14,10 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) +const ( + DefaultWaitPeriodBlockNotFound = time.Millisecond * 100 +) + type EthClienter interface { ethereum.LogFilterer ethereum.BlockNumberReader @@ -276,6 +280,17 @@ func (d *EVMDownloaderImplementation) GetBlockHeader(ctx context.Context, blockN // context is canceled, we don't want to fatal on max attempts in this case return EVMBlockHeader{}, true } + if errors.Is(err, ethereum.NotFound) { + // block num can temporarly disappear from the execution client due to a reorg, + // in this case, we want to wait and not panic + log.Warnf("block %d not found on the ethereum client: %v", blockNum, err) + if d.rh.RetryAfterErrorPeriod != 0 { + time.Sleep(d.rh.RetryAfterErrorPeriod) + } else { + time.Sleep(DefaultWaitPeriodBlockNotFound) + } + continue + } attempts++ d.log.Errorf("error getting block header for block %d, err: %v", blockNum, err) diff --git a/sync/evmdriver.go b/sync/evmdriver.go index a8d4b8ac..b8e706b9 100644 --- a/sync/evmdriver.go +++ b/sync/evmdriver.go @@ -152,7 +152,7 @@ func (d *EVMDriver) handleNewBlock(ctx context.Context, cancel context.CancelFun return } attempts++ - d.log.Errorf("error processing events for block %d, err: ", b.Num, err) + d.log.Errorf("error processing events for block %d, err: %v", b.Num, err) d.rh.Handle("handleNewBlock", attempts) } else { succeed = true diff --git a/tree/appendonlytree.go b/tree/appendonlytree.go index 5b14b962..7dbead01 100644 --- a/tree/appendonlytree.go +++ b/tree/appendonlytree.go @@ -6,10 +6,15 @@ import ( "fmt" "github.com/0xPolygon/cdk/db" + "github.com/0xPolygon/cdk/log" "github.com/0xPolygon/cdk/tree/types" "github.com/ethereum/go-ethereum/common" ) +var ( + ErrInvalidIndex = errors.New("invalid index") +) + // AppendOnlyTree is a tree where leaves are added sequentially (by index) type AppendOnlyTree struct { *Tree @@ -35,10 +40,11 @@ func (t *AppendOnlyTree) AddLeaf(tx db.Txer, blockNum, blockPosition uint64, lea return err } if int64(leaf.Index) != t.lastIndex+1 { - return fmt.Errorf( + log.Errorf( "mismatched index. Expected: %d, actual: %d", t.lastIndex+1, leaf.Index, ) + return ErrInvalidIndex } } // Calculate new tree nodes @@ -74,7 +80,10 @@ func (t *AppendOnlyTree) AddLeaf(tx db.Txer, blockNum, blockPosition uint64, lea return err } t.lastIndex++ - tx.AddRollbackCallback(func() { t.lastIndex-- }) + tx.AddRollbackCallback(func() { + log.Debugf("decreasing index due to rollback") + t.lastIndex-- + }) return nil }