Skip to content

Commit

Permalink
test passing
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaubennassar committed Nov 15, 2024
1 parent d41ebba commit dac8cfc
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 30 deletions.
27 changes: 27 additions & 0 deletions bridgesync/bridgesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,30 +171,51 @@ func (s *BridgeSync) Start(ctx context.Context) {
}

func (s *BridgeSync) GetLastProcessedBlock(ctx context.Context) (uint64, error) {
if s.processor.isHalted() {
return 0, sync.ErrInconsistentState
}
return s.processor.GetLastProcessedBlock(ctx)
}

func (s *BridgeSync) GetBridgeRootByHash(ctx context.Context, root common.Hash) (*tree.Root, error) {
if s.processor.isHalted() {
return nil, sync.ErrInconsistentState
}
return s.processor.exitTree.GetRootByHash(ctx, root)
}

func (s *BridgeSync) GetClaims(ctx context.Context, fromBlock, toBlock uint64) ([]Claim, error) {
if s.processor.isHalted() {
return nil, sync.ErrInconsistentState
}
return s.processor.GetClaims(ctx, fromBlock, toBlock)
}

func (s *BridgeSync) GetBridges(ctx context.Context, fromBlock, toBlock uint64) ([]Bridge, error) {
if s.processor.isHalted() {
return nil, sync.ErrInconsistentState
}
return s.processor.GetBridges(ctx, fromBlock, toBlock)
}

func (s *BridgeSync) GetBridgesPublished(ctx context.Context, fromBlock, toBlock uint64) ([]Bridge, error) {
if s.processor.isHalted() {
return nil, sync.ErrInconsistentState
}
return s.processor.GetBridgesPublished(ctx, fromBlock, toBlock)
}

func (s *BridgeSync) GetProof(ctx context.Context, depositCount uint32, localExitRoot common.Hash) (tree.Proof, error) {
if s.processor.isHalted() {
return tree.Proof{}, sync.ErrInconsistentState
}
return s.processor.exitTree.GetProof(ctx, depositCount, localExitRoot)
}

func (s *BridgeSync) GetBlockByLER(ctx context.Context, ler common.Hash) (uint64, error) {
if s.processor.isHalted() {
return 0, sync.ErrInconsistentState
}
root, err := s.processor.exitTree.GetRootByHash(ctx, ler)
if err != nil {
return 0, err
Expand All @@ -203,6 +224,9 @@ func (s *BridgeSync) GetBlockByLER(ctx context.Context, ler common.Hash) (uint64
}

func (s *BridgeSync) GetRootByLER(ctx context.Context, ler common.Hash) (*tree.Root, error) {
if s.processor.isHalted() {
return nil, sync.ErrInconsistentState
}
root, err := s.processor.exitTree.GetRootByHash(ctx, ler)
if err != nil {
return root, err
Expand All @@ -212,6 +236,9 @@ func (s *BridgeSync) GetRootByLER(ctx context.Context, ler common.Hash) (*tree.R

// GetExitRootByIndex returns the root of the exit tree at the moment the leaf with the given index was added
func (s *BridgeSync) GetExitRootByIndex(ctx context.Context, index uint32) (tree.Root, error) {
if s.processor.isHalted() {
return tree.Root{}, sync.ErrInconsistentState
}
return s.processor.exitTree.GetRootByIndex(ctx, index)
}

Expand Down
10 changes: 5 additions & 5 deletions bridgesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (

func TestBridgeEventE2E(t *testing.T) {
const (
totalBridges = 100
totalReorgs = 10
totalBridges = 1000
totalReorgs = 700
maxReorgDepth = 5
reorgEveryXIterations = 10
reorgEveryXIterations = 7 // every X blocks go back [1,maxReorgDepth] blocks
)
env := helpers.NewE2EEnvWithEVML2(t)
ctx := context.Background()
Expand Down Expand Up @@ -48,7 +48,7 @@ func TestBridgeEventE2E(t *testing.T) {
true, nil,
)
require.NoError(t, err)
helpers.CommitBlocks(t, env.L1Client, 1, time.Millisecond*100)
helpers.CommitBlocks(t, env.L1Client, 1, time.Millisecond)
bn, err := env.L1Client.Client().BlockNumber(ctx)
require.NoError(t, err)
bridge.BlockNum = bn
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestBridgeEventE2E(t *testing.T) {
}

// Wait for syncer to catch up
time.Sleep(time.Second) // sleeping since the processor could be up to date, but have pending reorgs
time.Sleep(time.Second * 2) // sleeping since the processor could be up to date, but have pending reorgs
lb, err := env.L1Client.Client().BlockNumber(ctx)
require.NoError(t, err)
helpers.RequireProcessorUpdated(t, env.BridgeL1Sync, lb)
Expand Down
40 changes: 35 additions & 5 deletions bridgesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"math/big"
mutex "sync"

"github.com/0xPolygon/cdk/bridgesync/migrations"
"github.com/0xPolygon/cdk/db"
Expand Down Expand Up @@ -99,9 +100,12 @@ type Event struct {
}

type processor struct {
db *sql.DB
exitTree *tree.AppendOnlyTree
log *log.Logger
db *sql.DB
exitTree *tree.AppendOnlyTree
log *log.Logger
mu mutex.RWMutex
halted bool
haltedReason string
}

func newProcessor(dbPath, loggerPrefix string) (*processor, error) {
Expand Down Expand Up @@ -243,7 +247,11 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
}
}()

_, err = tx.Exec(`DELETE FROM block WHERE num >= $1;`, firstReorgedBlock)
res, err := tx.Exec(`DELETE FROM block WHERE num >= $1;`, firstReorgedBlock)
if err != nil {
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return err
}
Expand All @@ -254,6 +262,12 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
if err := tx.Commit(); err != nil {
return err
}
if rowsAffected > 0 {
p.mu.Lock()
defer p.mu.Unlock()
p.halted = false
p.haltedReason = ""
}

log.Debug("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx BRIDGE SYNC xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx>")
return nil
Expand All @@ -262,6 +276,10 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
// ProcessBlock process the events of the block to build the exit tree
// and updates the last processed block (can be called without events for that purpose)
func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
if p.isHalted() {
log.Errorf("processor is halted due to: %s", p.haltedReason)
return sync.ErrInconsistentState
}
tx, err := db.NewTx(ctx, p.db)
if err != nil {
return err
Expand Down Expand Up @@ -289,7 +307,13 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
Index: event.Bridge.DepositCount,
Hash: event.Bridge.Hash(),
}); err != nil {
return err
if errors.Is(err, tree.ErrInvalidIndex) {
p.mu.Lock()
p.halted = true
p.haltedReason = fmt.Sprintf("error adding leaf to the exit tree: %v", err)
p.mu.Unlock()
}
return sync.ErrInconsistentState
}
if err = meddler.Insert(tx, "bridge", event.Bridge); err != nil {
return err
Expand Down Expand Up @@ -377,3 +401,9 @@ func DecodeGlobalIndex(globalIndex *big.Int) (mainnetFlag bool,
func convertBytesToUint32(bytes []byte) uint32 {
return uint32(big.NewInt(0).SetBytes(bytes).Uint64())
}

func (p *processor) isHalted() bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.halted
}
24 changes: 13 additions & 11 deletions db/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package db
import (
"database/sql"
"errors"
"fmt"

_ "github.com/mattn/go-sqlite3"
)
Expand All @@ -17,17 +18,18 @@ var (

// NewSQLiteDB creates a new SQLite DB
func NewSQLiteDB(dbPath string) (*sql.DB, error) {
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return nil, err
}
_, err = db.Exec(`
PRAGMA foreign_keys = ON;
pragma journal_mode = WAL;
pragma synchronous = normal;
pragma journal_size_limit = 6144000;
`)
return db, err
return sql.Open("sqlite3", fmt.Sprintf("file:%s?_txlock=exclusive&_foreign_keys=on", dbPath))
// db,err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_foreign_keys=on", dbPath))
// if err != nil {
// return nil, err
// }
// _, err = db.Exec(`
// PRAGMA foreign_keys = ON;
// pragma journal_mode = WAL;
// pragma synchronous = normal;
// pragma journal_size_limit = 6144000;
// `)
// return db, err
}

func ReturnErrNotFound(err error) error {
Expand Down
8 changes: 4 additions & 4 deletions l1infotreesync/cascade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
)

func TestCascade(t *testing.T) {
dbPath := path.Join(t.TempDir(), "file::memory:?cache=shared")
dbPath := path.Join(t.TempDir(), "TestCascade.sqlite")
p, err := newProcessor(dbPath)
require.NoError(t, err)
ctx := context.Background()
for i := 1; i < 10000; i++ {
for i := 1; i < 10_000; i++ {
// insert block and info
tx, err := db.NewTx(ctx, p.db)
require.NoError(t, err)
Expand All @@ -42,13 +42,13 @@ func TestCascade(t *testing.T) {
if i%3 == 0 {
tx, err = db.NewTx(ctx, p.db)
require.NoError(t, err)
_, err = tx.Exec(`delete from block where num >= $1;`, i-1)
_, err = tx.Exec(`delete from block where num >= $1;`, i)
require.NoError(t, err)
require.NoError(t, tx.Commit())
// assert that info table is empty
info, err = p.GetLastInfo()
require.NoError(t, err)
require.Equal(t, info.BlockNumber, uint64(i-2))
require.Equal(t, info.BlockNumber, uint64(i-1))
}
}
}
11 changes: 9 additions & 2 deletions l1infotreesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,19 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
}
}()

info := &L1InfoTreeLeaf{}
err = meddler.QueryRow(tx, info, `
SELECT * FROM l1info_leaf
ORDER BY block_num DESC, block_pos DESC
LIMIT 1;
`)
log.Debugf("info before delete: %d", info.BlockNumber)
log.Debug("xxxxxxxxxxxx going to delete")
res, err := tx.Exec(`DELETE FROM block WHERE num >= $1;`, firstReorgedBlock)
if err != nil {
return err
}
info := &L1InfoTreeLeaf{}
info = &L1InfoTreeLeaf{}
err = meddler.QueryRow(tx, info, `
SELECT * FROM l1info_leaf
ORDER BY block_num DESC, block_pos DESC
Expand All @@ -266,7 +273,7 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
panic("nope " + err.Error())
}
if info.BlockNumber >= firstReorgedBlock {
log.Fatal("on the tx 1 !!!!! unsuccessful reorg, l1 info table has invalid info")
log.Fatalf("on the tx: info block num before reorg: %d")
}
log.Debug("done deleting xxxxxxxxxxxx")

Expand Down
1 change: 1 addition & 0 deletions reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
headers := hdrs.getSorted()
for _, hdr := range headers {
// Get the actual header from the network or from the cache
var err error
headersCacheLock.Lock()
currentHeader, ok := headersCache[hdr.Num]
if !ok || currentHeader == nil {
Expand Down
15 changes: 15 additions & 0 deletions sync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)

const (
DefaultWaitPeriodBlockNotFound = time.Millisecond * 100
)

type EthClienter interface {
ethereum.LogFilterer
ethereum.BlockNumberReader
Expand Down Expand Up @@ -276,6 +280,17 @@ func (d *EVMDownloaderImplementation) GetBlockHeader(ctx context.Context, blockN
// context is canceled, we don't want to fatal on max attempts in this case
return EVMBlockHeader{}, true
}
if errors.Is(err, ethereum.NotFound) {
// block num can temporarly disappear from the execution client due to a reorg,
// in this case, we want to wait and not panic
log.Warnf("block %d not found on the ethereum client: %v", blockNum, err)
if d.rh.RetryAfterErrorPeriod != 0 {
time.Sleep(d.rh.RetryAfterErrorPeriod)
} else {
time.Sleep(DefaultWaitPeriodBlockNotFound)
}
continue
}

attempts++
d.log.Errorf("error getting block header for block %d, err: %v", blockNum, err)
Expand Down
2 changes: 1 addition & 1 deletion sync/evmdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (d *EVMDriver) handleNewBlock(ctx context.Context, cancel context.CancelFun
return
}
attempts++
d.log.Errorf("error processing events for block %d, err: ", b.Num, err)
d.log.Errorf("error processing events for block %d, err: %v", b.Num, err)
d.rh.Handle("handleNewBlock", attempts)
} else {
succeed = true
Expand Down
13 changes: 11 additions & 2 deletions tree/appendonlytree.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ import (
"fmt"

"github.com/0xPolygon/cdk/db"
"github.com/0xPolygon/cdk/log"
"github.com/0xPolygon/cdk/tree/types"
"github.com/ethereum/go-ethereum/common"
)

var (
ErrInvalidIndex = errors.New("invalid index")
)

// AppendOnlyTree is a tree where leaves are added sequentially (by index)
type AppendOnlyTree struct {
*Tree
Expand All @@ -35,10 +40,11 @@ func (t *AppendOnlyTree) AddLeaf(tx db.Txer, blockNum, blockPosition uint64, lea
return err
}
if int64(leaf.Index) != t.lastIndex+1 {
return fmt.Errorf(
log.Errorf(
"mismatched index. Expected: %d, actual: %d",
t.lastIndex+1, leaf.Index,
)
return ErrInvalidIndex
}
}
// Calculate new tree nodes
Expand Down Expand Up @@ -74,7 +80,10 @@ func (t *AppendOnlyTree) AddLeaf(tx db.Txer, blockNum, blockPosition uint64, lea
return err
}
t.lastIndex++
tx.AddRollbackCallback(func() { t.lastIndex-- })
tx.AddRollbackCallback(func() {
log.Debugf("decreasing index due to rollback")
t.lastIndex--
})
return nil
}

Expand Down

0 comments on commit dac8cfc

Please sign in to comment.