Skip to content

Commit

Permalink
Merge pull request #2714 from OffchainLabs/improve-blocksreexecutor-impl
Browse files Browse the repository at this point in the history
[config change] Improve BlocksReExecutor implementation
  • Loading branch information
joshuacolvin0 authored Nov 7, 2024
2 parents f57a0eb + a74d22a commit 7e547ad
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 66 deletions.
177 changes: 130 additions & 47 deletions blocks_reexecutor/blocks_reexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,31 @@ import (
"math/rand"
"runtime"
"strings"
"sync"

"github.com/ethereum/go-ethereum/arbitrum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/triedb"
"github.com/ethereum/go-ethereum/triedb/hashdb"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/stopwaiter"
flag "github.com/spf13/pflag"
)

type Config struct {
Enable bool `koanf:"enable"`
Mode string `koanf:"mode"`
StartBlock uint64 `koanf:"start-block"`
EndBlock uint64 `koanf:"end-block"`
Room int `koanf:"room"`
BlocksPerThread uint64 `koanf:"blocks-per-thread"`
Enable bool `koanf:"enable"`
Mode string `koanf:"mode"`
StartBlock uint64 `koanf:"start-block"`
EndBlock uint64 `koanf:"end-block"`
Room int `koanf:"room"`
MinBlocksPerThread uint64 `koanf:"min-blocks-per-thread"`
TrieCleanLimit int `koanf:"trie-clean-limit"`
}

func (c *Config) Validate() error {
Expand All @@ -48,10 +55,11 @@ var DefaultConfig = Config{
}

var TestConfig = Config{
Enable: true,
Mode: "full",
Room: runtime.NumCPU(),
BlocksPerThread: 10,
Enable: true,
Mode: "full",
Room: runtime.NumCPU(),
MinBlocksPerThread: 10,
TrieCleanLimit: 600,
}

func ConfigAddOptions(prefix string, f *flag.FlagSet) {
Expand All @@ -60,22 +68,28 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Uint64(prefix+".start-block", DefaultConfig.StartBlock, "first block number of the block range for re-execution")
f.Uint64(prefix+".end-block", DefaultConfig.EndBlock, "last block number of the block range for re-execution")
f.Int(prefix+".room", DefaultConfig.Room, "number of threads to parallelize blocks re-execution")
f.Uint64(prefix+".blocks-per-thread", DefaultConfig.BlocksPerThread, "minimum number of blocks to execute per thread. When mode is random this acts as the size of random block range sample")
f.Uint64(prefix+".min-blocks-per-thread", DefaultConfig.MinBlocksPerThread, "minimum number of blocks to execute per thread. When mode is random this acts as the size of random block range sample")
f.Int(prefix+".trie-clean-limit", DefaultConfig.TrieCleanLimit, "memory allowance (MB) to use for caching trie nodes in memory")
}

type BlocksReExecutor struct {
stopwaiter.StopWaiter
config *Config
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
blocksPerThread uint64
config *Config
db state.Database
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
minBlocksPerThread uint64
mutex sync.Mutex
}

func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *BlocksReExecutor {
func New(c *Config, blockchain *core.BlockChain, ethDb ethdb.Database, fatalErrChan chan error) (*BlocksReExecutor, error) {
if blockchain.TrieDB().Scheme() == rawdb.PathScheme {
return nil, errors.New("blocksReExecutor not supported on pathdb")
}
start := c.StartBlock
end := c.EndBlock
chainStart := blockchain.Config().ArbitrumChainParams.GenesisBlockNum
Expand All @@ -92,13 +106,13 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block
log.Warn("invalid state reexecutor's end block number, resetting to latest", "end", end, "latest", chainEnd)
end = chainEnd
}
blocksPerThread := uint64(10000)
if c.BlocksPerThread != 0 {
blocksPerThread = c.BlocksPerThread
minBlocksPerThread := uint64(10000)
if c.MinBlocksPerThread != 0 {
minBlocksPerThread = c.MinBlocksPerThread
}
if c.Mode == "random" && end != start {
// Reexecute a range of 10000 or (non-zero) c.BlocksPerThread number of blocks between start to end picked randomly
rng := blocksPerThread
// Reexecute a range of 10000 or (non-zero) c.MinBlocksPerThread number of blocks between start to end picked randomly
rng := minBlocksPerThread
if rng > end-start {
rng = end - start
}
Expand All @@ -111,32 +125,46 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block
if start > 0 && start != chainStart {
start--
}
// Divide work equally among available threads when BlocksPerThread is zero
if c.BlocksPerThread == 0 {
// Divide work equally among available threads when MinBlocksPerThread is zero
if c.MinBlocksPerThread == 0 {
// #nosec G115
work := (end - start) / uint64(c.Room)
work := (end - start) / uint64(c.Room*2)
if work > 0 {
blocksPerThread = work
minBlocksPerThread = work
}
}
return &BlocksReExecutor{
config: c,
blockchain: blockchain,
currentBlock: end,
startBlock: start,
blocksPerThread: blocksPerThread,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
stateFor: func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) {
state, err := blockchain.StateAt(header.Root)
return state, arbitrum.NoopStateRelease, err
},
hashConfig := *hashdb.Defaults
hashConfig.CleanCacheSize = c.TrieCleanLimit * 1024 * 1024
trieConfig := triedb.Config{
Preimages: false,
HashDB: &hashConfig,
}
blocksReExecutor := &BlocksReExecutor{
config: c,
db: state.NewDatabaseWithConfig(ethDb, &trieConfig),
blockchain: blockchain,
currentBlock: end,
startBlock: start,
minBlocksPerThread: minBlocksPerThread,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
}
blocksReExecutor.stateFor = func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) {
blocksReExecutor.mutex.Lock()
defer blocksReExecutor.mutex.Unlock()
sdb, err := state.New(header.Root, blocksReExecutor.db, nil)
if err == nil {
_ = blocksReExecutor.db.TrieDB().Reference(header.Root, common.Hash{}) // Will be dereferenced later in advanceStateUpToBlock
return sdb, func() { blocksReExecutor.dereferenceRoot(header.Root) }, nil
}
return sdb, arbitrum.NoopStateRelease, err
}
return blocksReExecutor, nil
}

// LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.BlocksPerThread, currentBlock] to the last available valid state
// LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.MinBlocksPerThread, currentBlock] to the last available valid state
func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentBlock uint64) uint64 {
start := arbmath.SaturatingUSub(currentBlock, s.blocksPerThread)
start := arbmath.SaturatingUSub(currentBlock, s.minBlocksPerThread)
if start < s.startBlock {
start = s.startBlock
}
Expand All @@ -145,12 +173,10 @@ func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentB
s.fatalErrChan <- fmt.Errorf("blocksReExecutor failed to get last available state while searching for state at %d, err: %w", start, err)
return s.startBlock
}
// NoOp
defer release()
start = startHeader.Number.Uint64()
s.LaunchThread(func(ctx context.Context) {
_, err := arbitrum.AdvanceStateUpToBlock(ctx, s.blockchain, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader, nil)
if err != nil {
log.Info("Starting reexecution of blocks against historic state", "stateAt", start, "startBlock", start+1, "endBlock", currentBlock)
if err := s.advanceStateUpToBlock(ctx, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader, release); err != nil {
s.fatalErrChan <- fmt.Errorf("blocksReExecutor errored advancing state from block %d to block %d, err: %w", start, currentBlock, err)
} else {
log.Info("Successfully reexecuted blocks against historic state", "stateAt", start, "startBlock", start+1, "endBlock", currentBlock)
Expand Down Expand Up @@ -199,3 +225,60 @@ func (s *BlocksReExecutor) Start(ctx context.Context, done chan struct{}) {
func (s *BlocksReExecutor) StopAndWait() {
s.StopWaiter.StopAndWait()
}

func (s *BlocksReExecutor) dereferenceRoot(root common.Hash) {
s.mutex.Lock()
defer s.mutex.Unlock()
_ = s.db.TrieDB().Dereference(root)
}

func (s *BlocksReExecutor) commitStateAndVerify(statedb *state.StateDB, expected common.Hash, blockNumber uint64) (*state.StateDB, arbitrum.StateReleaseFunc, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
result, err := statedb.Commit(blockNumber, true)
if err != nil {
return nil, arbitrum.NoopStateRelease, err
}
if result != expected {
return nil, arbitrum.NoopStateRelease, fmt.Errorf("bad root hash expected: %v got: %v", expected, result)
}
sdb, err := state.New(result, s.db, nil)
if err == nil {
_ = s.db.TrieDB().Reference(result, common.Hash{})
return sdb, func() { s.dereferenceRoot(result) }, nil
}
return sdb, arbitrum.NoopStateRelease, err
}

func (s *BlocksReExecutor) advanceStateUpToBlock(ctx context.Context, state *state.StateDB, targetHeader *types.Header, lastAvailableHeader *types.Header, lastRelease arbitrum.StateReleaseFunc) error {
targetBlockNumber := targetHeader.Number.Uint64()
blockToRecreate := lastAvailableHeader.Number.Uint64() + 1
prevHash := lastAvailableHeader.Hash()
var stateRelease arbitrum.StateReleaseFunc
defer func() {
lastRelease()
}()
var block *types.Block
var err error
for ctx.Err() == nil {
state, block, err = arbitrum.AdvanceStateByBlock(ctx, s.blockchain, state, blockToRecreate, prevHash, nil)
if err != nil {
return err
}
prevHash = block.Hash()
state, stateRelease, err = s.commitStateAndVerify(state, block.Root(), block.NumberU64())
if err != nil {
return fmt.Errorf("failed committing state for block %d : %w", blockToRecreate, err)
}
lastRelease()
lastRelease = stateRelease
if blockToRecreate >= targetBlockNumber {
if block.Hash() != targetHeader.Hash() {
return fmt.Errorf("blockHash doesn't match when recreating number: %d expected: %v got: %v", blockToRecreate, targetHeader.Hash(), block.Hash())
}
return nil
}
blockToRecreate++
}
return ctx.Err()
}
6 changes: 5 additions & 1 deletion cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,11 @@ func mainImpl() int {

var blocksReExecutor *blocksreexecutor.BlocksReExecutor
if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil {
blocksReExecutor = blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan)
blocksReExecutor, err = blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, chainDb, fatalErrChan)
if err != nil {
log.Error("error initializing blocksReExecutor", "err", err)
return 1
}
if nodeConfig.Init.ThenQuit {
if err := gethexec.PopulateStylusTargetCache(&nodeConfig.Execution.StylusTarget); err != nil {
log.Error("error populating stylus target cache", "err", err)
Expand Down
32 changes: 17 additions & 15 deletions execution/gethexec/block_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,21 @@ func NewBlockRecorder(config *BlockRecorderConfig, execEngine *ExecutionEngine,
return recorder
}

func stateLogFunc(targetHeader, header *types.Header, hasState bool) {
if targetHeader == nil || header == nil {
return
}
gap := targetHeader.Number.Int64() - header.Number.Int64()
step := int64(500)
stage := "computing state"
if !hasState {
step = 3000
stage = "looking for full block"
}
if (gap >= step) && (gap%step == 0) {
log.Info("Setting up validation", "stage", stage, "current", header.Number, "target", targetHeader.Number)
func stateLogFunc(targetHeader *types.Header) arbitrum.StateBuildingLogFunction {
return func(header *types.Header, hasState bool) {
if targetHeader == nil || header == nil {
return
}
gap := targetHeader.Number.Int64() - header.Number.Int64()
step := int64(500)
stage := "computing state"
if !hasState {
step = 3000
stage = "looking for full block"
}
if (gap >= step) && (gap%step == 0) {
log.Info("Setting up validation", "stage", stage, "current", header.Number, "target", targetHeader.Number)
}
}
}

Expand All @@ -109,7 +111,7 @@ func (r *BlockRecorder) RecordBlockCreation(
}
}

recordingdb, chaincontext, recordingKV, err := r.recordingDatabase.PrepareRecording(ctx, prevHeader, stateLogFunc)
recordingdb, chaincontext, recordingKV, err := r.recordingDatabase.PrepareRecording(ctx, prevHeader, stateLogFunc(prevHeader))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -321,7 +323,7 @@ func (r *BlockRecorder) PrepareForRecord(ctx context.Context, start, end arbutil
log.Warn("prepareblocks asked for non-found block", "hdrNum", hdrNum)
break
}
_, err := r.recordingDatabase.GetOrRecreateState(ctx, header, stateLogFunc)
_, err := r.recordingDatabase.GetOrRecreateState(ctx, header, stateLogFunc(header))
if err != nil {
log.Warn("prepareblocks failed to get state for block", "hdrNum", hdrNum, "err", err)
break
Expand Down
2 changes: 1 addition & 1 deletion go-ethereum
8 changes: 6 additions & 2 deletions system_tests/blocks_reexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
blocksreexecutor "github.com/offchainlabs/nitro/blocks_reexecutor"
)

Expand All @@ -13,6 +14,7 @@ func TestBlocksReExecutorModes(t *testing.T) {
defer cancel()

builder := NewNodeBuilder(ctx).DefaultConfig(t, false)
builder.execConfig.Caching.StateScheme = rawdb.HashScheme
cleanup := builder.Build(t)
defer cleanup()

Expand All @@ -37,7 +39,8 @@ func TestBlocksReExecutorModes(t *testing.T) {

// Reexecute blocks at mode full
success := make(chan struct{})
executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan)
executorFull, err := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, builder.L2.ExecNode.ChainDB, feedErrChan)
Require(t, err)
executorFull.Start(ctx, success)
select {
case err := <-feedErrChan:
Expand All @@ -49,7 +52,8 @@ func TestBlocksReExecutorModes(t *testing.T) {
success = make(chan struct{})
c := &blocksreexecutor.TestConfig
c.Mode = "random"
executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan)
executorRandom, err := blocksreexecutor.New(c, blockchain, builder.L2.ExecNode.ChainDB, feedErrChan)
Require(t, err)
executorRandom.Start(ctx, success)
select {
case err := <-feedErrChan:
Expand Down

0 comments on commit 7e547ad

Please sign in to comment.