Skip to content

Commit

Permalink
InMem/Sync/IsMining replacement (#12028)
Browse files Browse the repository at this point in the history
closes #11774

---------

Co-authored-by: JkLondon <[email protected]>
  • Loading branch information
JkLondon and JkLondon authored Sep 19, 2024
1 parent 1dd4a55 commit d515ff6
Show file tree
Hide file tree
Showing 14 changed files with 85 additions and 68 deletions.
5 changes: 3 additions & 2 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,9 +1483,9 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
recents = bor.Recents
signatures = bor.Signatures
}
stages := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil,
stagesDefault := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil,
heimdallClient, recents, signatures, logger)
sync := stagedsync.New(cfg.Sync, stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger)
sync := stagedsync.New(cfg.Sync, stagesDefault, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, stages.ApplyingBlocks)

miner := stagedsync.NewMiningState(&cfg.Miner)
miningCancel := make(chan struct{})
Expand Down Expand Up @@ -1524,6 +1524,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
stagedsync.MiningUnwindOrder,
stagedsync.MiningPruneOrder,
logger,
stages.BlockProduction,
)

return engine, vmConfig, sync, miningSync, miner
Expand Down
23 changes: 12 additions & 11 deletions cmd/state/exec3/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package exec3

import (
"context"
"github.com/erigontech/erigon/eth/stagedsync/stages"
"sync"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -70,10 +71,10 @@ type Worker struct {

dirs datadir.Dirs

isMining bool
mode stages.Mode
}

func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs, isMining bool) *Worker {
func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs, mode stages.Mode) *Worker {
w := &Worker{
lock: lock,
logger: logger,
Expand All @@ -94,7 +95,7 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro

dirs: dirs,

isMining: isMining,
mode: mode,
}
w.taskGasPool.AddBlobGas(chainConfig.GetMaxBlobGasPerBlock())
w.vmCfg = vm.Config{Debug: true, Tracer: w.callTracer}
Expand Down Expand Up @@ -130,18 +131,18 @@ func (rw *Worker) ResetTx(chainTx kv.Tx) {

func (rw *Worker) Run() error {
for txTask, ok := rw.in.Next(rw.ctx); ok; txTask, ok = rw.in.Next(rw.ctx) {
rw.RunTxTask(txTask, rw.isMining)
rw.RunTxTask(txTask, rw.mode)
if err := rw.resultCh.Add(rw.ctx, txTask); err != nil {
return err
}
}
return nil
}

func (rw *Worker) RunTxTask(txTask *state.TxTask, isMining bool) {
func (rw *Worker) RunTxTask(txTask *state.TxTask, mode stages.Mode) {
rw.lock.Lock()
defer rw.lock.Unlock()
rw.RunTxTaskNoLock(txTask, isMining)
rw.RunTxTaskNoLock(txTask, mode)
}

// Needed to set history reader when need to offset few txs from block beginning and does not break processing,
Expand All @@ -163,7 +164,7 @@ func (rw *Worker) SetReader(reader state.ResettableStateReader) {
}
}

func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) {
func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, mode stages.Mode) {
if txTask.HistoryExecution && !rw.historyMode {
// in case if we cancelled execution and commitment happened in the middle of the block, we have to process block
// from the beginning until committed txNum and only then disable history mode.
Expand Down Expand Up @@ -232,7 +233,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) {
return core.SysCallContract(contract, data, rw.chainConfig, ibs, header, rw.engine, false /* constCall */)
}

if isMining {
if mode == stages.BlockProduction {
_, txTask.Txs, txTask.BlockReceipts, err = rw.engine.FinalizeAndAssemble(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, nil, rw.logger)
} else {
_, _, _, err = rw.engine.Finalize(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, rw.logger)
Expand Down Expand Up @@ -298,7 +299,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) {
}
}

func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs, isMining bool) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) {
func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs, mode stages.Mode) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) {
reconWorkers = make([]*Worker, workerCount)

resultChSize := workerCount * 8
Expand All @@ -309,7 +310,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < workerCount; i++ {
reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, isMining)
reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, mode)
reconWorkers[i].ResetState(rs, accumulator)
}
if background {
Expand All @@ -336,7 +337,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo
//applyWorker.ResetTx(nil)
}
}
applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, isMining)
applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, mode)

return reconWorkers, applyWorker, rws, clear, wait
}
11 changes: 6 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"cmp"
"encoding/json"
"fmt"
"github.com/erigontech/erigon/eth/stagedsync/stages"
"reflect"
"slices"
"time"
Expand Down Expand Up @@ -170,7 +171,7 @@ func ExecuteBlockEphemerally(

if !vmConfig.ReadOnly {
txs := block.Transactions()
if _, _, _, err := FinalizeBlockExecution(engine, stateReader, block.Header(), txs, block.Uncles(), stateWriter, chainConfig, ibs, receipts, block.Withdrawals(), block.Requests(), chainReader, false, logger); err != nil {
if _, _, _, err := FinalizeBlockExecution(engine, stateReader, block.Header(), txs, block.Uncles(), stateWriter, chainConfig, ibs, receipts, block.Withdrawals(), block.Requests(), chainReader, stages.ApplyingBlocks, logger); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -323,14 +324,14 @@ func FinalizeBlockExecution(
stateWriter state.StateWriter, cc *chain.Config,
ibs *state.IntraBlockState, receipts types.Receipts,
withdrawals []*types.Withdrawal, requests types.Requests, chainReader consensus.ChainReader,
isMining bool,
mode stages.Mode,
logger log.Logger,
) (newBlock *types.Block, newTxs types.Transactions, newReceipt types.Receipts, err error) {
syscall := func(contract libcommon.Address, data []byte) ([]byte, error) {
return SysCallContract(contract, data, cc, ibs, header, engine, false /* constCall */)
}

if isMining {
if mode == stages.BlockProduction {
newBlock, newTxs, newReceipt, err = engine.FinalizeAndAssemble(cc, header, ibs, txs, uncles, receipts, withdrawals, requests, chainReader, syscall, nil, logger)
} else {
var rss types.Requests
Expand Down Expand Up @@ -367,7 +368,7 @@ func InitializeBlockExecution(engine consensus.Engine, chain consensus.ChainHead
return nil
}

func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receipts types.Receipts, h *types.Header, isMining bool) error {
func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receipts types.Receipts, h *types.Header, mode stages.Mode) error {
if gasUsed != h.GasUsed {
return fmt.Errorf("gas used by execution: %d, in header: %d, headerNum=%d, %x",
gasUsed, h.GasUsed, h.Number.Uint64(), h.Hash())
Expand All @@ -383,7 +384,7 @@ func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receip
}
receiptHash := types.DeriveSha(receipts)
if receiptHash != h.ReceiptHash {
if isMining {
if mode == stages.BlockProduction {
h.ReceiptHash = receiptHash
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool, backend.txPoolDB, blockReader),
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore),
), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder,
logger)
logger, stages.BlockProduction)

var ethashApi *ethash.API
if casted, ok := backend.engine.(*ethash.Ethash); ok {
Expand Down Expand Up @@ -763,7 +763,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
),
stagedsync.StageSendersCfg(backend.chainDB, chainConfig, config.Sync, false, dirs.Tmp, config.Prune, blockReader, backend.sentriesClient.Hd),
stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool, backend.txPoolDB, blockReader),
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger)
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, stages.BlockProduction)
// We start the mining step
if err := stages2.MiningStep(ctx, backend.chainDB, proposingSync, tmpdir, logger); err != nil {
return nil, err
Expand Down Expand Up @@ -916,7 +916,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
backend.syncPruneOrder = stagedsync.DefaultPruneOrder
}

backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger)
backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger, stages.ApplyingBlocks)

hook := stages2.NewHook(backend.sentryCtx, backend.chainDB, backend.notifications, backend.stagedSync, backend.blockReader, backend.chainConfig, backend.logger, backend.sentriesClient.SetStatus)

Expand Down Expand Up @@ -944,7 +944,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

checkStateRoot := true
pipelineStages := stages2.NewPipelineStages(ctx, backend.chainDB, config, p2pConfig, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, logger, checkStateRoot)
backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger)
backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ApplyingBlocks)
backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx)
executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer)

Expand Down
Loading

0 comments on commit d515ff6

Please sign in to comment.