Skip to content

Commit

Permalink
backport program cache for ans
Browse files Browse the repository at this point in the history
  • Loading branch information
peterargue committed May 1, 2024
1 parent 12f3237 commit aa22a87
Show file tree
Hide file tree
Showing 19 changed files with 676 additions and 93 deletions.
49 changes: 45 additions & 4 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/onflow/flow-go/engine/common/requester"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/execution/computation/query"
"github.com/onflow/flow-go/fvm/storage/derived"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/model/bootstrap"
Expand Down Expand Up @@ -155,6 +156,7 @@ type AccessNodeConfig struct {
scriptExecMaxBlock uint64
registerCacheType string
registerCacheSize uint
programCacheSize uint
}

type PublicNetworkConfig struct {
Expand Down Expand Up @@ -251,6 +253,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
scriptExecMaxBlock: math.MaxUint64,
registerCacheType: pStorage.CacheTypeTwoQueue.String(),
registerCacheSize: 0,
programCacheSize: 0,
}
}

Expand Down Expand Up @@ -803,6 +806,11 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.Storage.RegisterIndex = registers
}

indexerDerivedChainData, queryDerivedChainData, err := builder.buildDerivedChainData()
if err != nil {
return nil, fmt.Errorf("could not create derived chain data: %w", err)
}

indexerCore, err := indexer.New(
builder.Logger,
metrics.NewExecutionStateIndexerCollector(),
Expand All @@ -813,6 +821,8 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.Storage.Collections,
builder.Storage.Transactions,
builder.Storage.LightTransactionResults,
builder.RootChainID.Chain(),
indexerDerivedChainData,
builder.collectionExecutedMetric,
)
if err != nil {
Expand All @@ -839,18 +849,17 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess

// create script execution module, this depends on the indexer being initialized and the
// having the register storage bootstrapped
scripts, err := execution.NewScripts(
scripts := execution.NewScripts(
builder.Logger,
metrics.NewExecutionCollector(builder.Tracer),
builder.RootChainID,
query.NewProtocolStateWrapper(builder.State),
builder.Storage.Headers,
builder.ExecutionIndexerCore.RegisterValue,
builder.scriptExecutorConfig,
queryDerivedChainData,
builder.programCacheSize > 0,
)
if err != nil {
return nil, err
}

err = builder.ScriptExecutor.Initialize(builder.ExecutionIndexer, scripts)
if err != nil {
Expand Down Expand Up @@ -968,6 +977,34 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return builder
}

// buildDerivedChainData creates the derived chain data for the indexer and the query engine
// If program caching is disabled, the function will return nil for the indexer cache, and a
// derived chain data object for the query engine cache.
func (builder *FlowAccessNodeBuilder) buildDerivedChainData() (
indexerCache *derived.DerivedChainData,
queryCache *derived.DerivedChainData,
err error,
) {
cacheSize := builder.programCacheSize

// the underlying cache requires size > 0. no data will be written so 1 is fine.
if cacheSize == 0 {
cacheSize = 1
}

derivedChainData, err := derived.NewDerivedChainData(cacheSize)
if err != nil {
return nil, nil, err
}

// writes are done by the indexer. using a nil value effectively disables writes to the cache.
if builder.programCacheSize == 0 {
return nil, derivedChainData, nil
}

return derivedChainData, derivedChainData, nil
}

func FlowAccessNode(nodeBuilder *cmd.FlowNodeBuilder) *FlowAccessNodeBuilder {
dist := consensuspubsub.NewFollowerDistributor()
dist.AddProposalViolationConsumer(notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger))
Expand Down Expand Up @@ -1229,6 +1266,10 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"register-cache-size",
defaultConfig.registerCacheSize,
"number of registers to cache for script execution. default: 0 (no cache)")
flags.UintVar(&builder.programCacheSize,
"program-cache-size",
defaultConfig.programCacheSize,
"[experimental] number of blocks to cache for cadence programs. use 0 to disable cache. default: 0. Note: this is an experimental feature and may cause nodes to become unstable under certain workloads. Use with caution.")
}).ValidateFlags(func() error {
if builder.supportsObserver && (builder.PublicNetworkConfig.BindAddress == cmd.NotSet || builder.PublicNetworkConfig.BindAddress == "") {
return errors.New("public-network-address must be set if supports-observer is true")
Expand Down
16 changes: 12 additions & 4 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/onflow/flow-go/engine/common/follower"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/execution/computation/query"
"github.com/onflow/flow-go/fvm/storage/derived"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/model/bootstrap"
Expand Down Expand Up @@ -1300,6 +1301,8 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
builder.Storage.Collections,
builder.Storage.Transactions,
builder.Storage.LightTransactionResults,
builder.RootChainID.Chain(),
nil,
collectionExecutedMetric,
)
if err != nil {
Expand Down Expand Up @@ -1329,20 +1332,25 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return nil, err
}

// cache is disabled on observers for now
derivedChainData, err := derived.NewDerivedChainData(1)
if err != nil {
return nil, err
}

// create script execution module, this depends on the indexer being initialized and the
// having the register storage bootstrapped
scripts, err := execution.NewScripts(
scripts := execution.NewScripts(
builder.Logger,
metrics.NewExecutionCollector(builder.Tracer),
builder.RootChainID,
query.NewProtocolStateWrapper(builder.State),
builder.Storage.Headers,
builder.ExecutionIndexerCore.RegisterValue,
builder.scriptExecutorConfig,
derivedChainData,
false,
)
if err != nil {
return nil, err
}

err = builder.ScriptExecutor.Initialize(builder.ExecutionIndexer, scripts)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions engine/execution/computation/computer/computer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ func NewBlockComputer(
if maxConcurrency < 1 {
return nil, fmt.Errorf("invalid maxConcurrency: %d", maxConcurrency)
}

// this is a safeguard to prevent scripts from writing to the program cache on Execution nodes.
// writes are only allowed by transactions.
if vmCtx.AllowProgramCacheWritesInScripts {
return nil, fmt.Errorf("program cache writes are not allowed in scripts on Execution nodes")
}

systemChunkCtx := SystemChunkContext(vmCtx)
vmCtx = fvm.NewContextFromParent(
vmCtx,
Expand Down
12 changes: 12 additions & 0 deletions fvm/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Context struct {
tracing.TracerSpan

environment.EnvironmentParams

// AllowProgramCacheWritesInScripts determines if the program cache can be written to in scripts
// By default, the program cache is only updated by transactions.
AllowProgramCacheWritesInScripts bool
}

// NewContext initializes a new execution context with the provided options.
Expand Down Expand Up @@ -375,3 +379,11 @@ func WithEVMEnabled(enabled bool) Option {
return ctx
}
}

// WithAllowProgramCacheWritesInScriptsEnabled enables caching of programs accessed by scripts
func WithAllowProgramCacheWritesInScriptsEnabled(enabled bool) Option {
return func(ctx Context) Context {
ctx.AllowProgramCacheWritesInScripts = enabled
return ctx
}
}
9 changes: 7 additions & 2 deletions fvm/fvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/onflow/cadence"

"github.com/onflow/flow-go/fvm/environment"
errors "github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/fvm/meter"
"github.com/onflow/flow-go/fvm/storage"
"github.com/onflow/flow-go/fvm/storage/logical"
Expand Down Expand Up @@ -161,7 +161,12 @@ func (vm *VirtualMachine) Run(
var err error
switch proc.Type() {
case ScriptProcedureType:
storageTxn = blockDatabase.NewSnapshotReadTransaction(stateParameters)
if ctx.AllowProgramCacheWritesInScripts {
// if configured, allow scripts to update the programs cache
storageTxn, err = blockDatabase.NewCachingSnapshotReadTransaction(stateParameters)
} else {
storageTxn = blockDatabase.NewSnapshotReadTransaction(stateParameters)
}
case TransactionProcedureType, BootstrapProcedureType:
storageTxn, err = blockDatabase.NewTransaction(
proc.ExecutionTime(),
Expand Down
12 changes: 12 additions & 0 deletions fvm/storage/block_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (database *BlockDatabase) NewTransaction(
}, nil
}

// NewSnapshotReadTransaction creates a new readonly transaction.
func (database *BlockDatabase) NewSnapshotReadTransaction(
parameters state.StateParameters,
) Transaction {
Expand All @@ -78,6 +79,17 @@ func (database *BlockDatabase) NewSnapshotReadTransaction(
}
}

// NewCachingSnapshotReadTransaction creates a new readonly transaction that allows writing to the
// derived transaction data table.
func (database *BlockDatabase) NewCachingSnapshotReadTransaction(
parameters state.StateParameters,
) (Transaction, error) {
return &transaction{
TransactionData: database.BlockData.NewCachingSnapshotReadTransactionData(parameters),
DerivedTransactionData: database.DerivedBlockData.NewCachingSnapshotReadDerivedTransactionData(),
}, nil
}

func (txn *transaction) Validate() error {
err := txn.DerivedTransactionData.Validate()
if err != nil {
Expand Down
13 changes: 8 additions & 5 deletions fvm/storage/derived/derived_block_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,16 @@ func (block *DerivedBlockData) NewChildDerivedBlockData() *DerivedBlockData {
}

func (block *DerivedBlockData) NewSnapshotReadDerivedTransactionData() *DerivedTransactionData {
txnPrograms := block.programs.NewSnapshotReadTableTransaction()

txnMeterParamOverrides := block.meterParamOverrides.NewSnapshotReadTableTransaction()
return &DerivedTransactionData{
programs: block.programs.NewSnapshotReadTableTransaction(),
meterParamOverrides: block.meterParamOverrides.NewSnapshotReadTableTransaction(),
}
}

func (block *DerivedBlockData) NewCachingSnapshotReadDerivedTransactionData() *DerivedTransactionData {
return &DerivedTransactionData{
programs: txnPrograms,
meterParamOverrides: txnMeterParamOverrides,
programs: block.programs.NewCachingSnapshotReadTableTransaction(),
meterParamOverrides: block.meterParamOverrides.NewCachingSnapshotReadTableTransaction(),
}
}

Expand Down
32 changes: 25 additions & 7 deletions fvm/storage/derived/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ type TableTransaction[TKey comparable, TVal any] struct {
// When isSnapshotReadTransaction is true, invalidators must be empty.
isSnapshotReadTransaction bool
invalidators chainedTableInvalidators[TKey, TVal]

// ignoreLatestCommitExecutionTime is used to bypass latestCommitExecutionTime checks during
// commit. This is used when operating in caching mode with scripts since "commits" are all done
// at the end of the block and are not expected to progress the execution time.
ignoreLatestCommitExecutionTime bool
}

func NewEmptyTable[
Expand Down Expand Up @@ -270,6 +275,7 @@ func (table *DerivedDataTable[TKey, TVal]) commit(
defer table.lock.Unlock()

if !txn.isSnapshotReadTransaction &&
!txn.ignoreLatestCommitExecutionTime &&
table.latestCommitExecutionTime+1 < txn.snapshotTime {

return fmt.Errorf(
Expand Down Expand Up @@ -328,22 +334,33 @@ func (table *DerivedDataTable[TKey, TVal]) newTableTransaction(
snapshotTime logical.Time,
executionTime logical.Time,
isSnapshotReadTransaction bool,
ignoreLatestCommitExecutionTime bool,
) *TableTransaction[TKey, TVal] {
return &TableTransaction[TKey, TVal]{
table: table,
snapshotTime: snapshotTime,
executionTime: executionTime,
toValidateTime: snapshotTime,
readSet: map[TKey]*invalidatableEntry[TVal]{},
writeSet: map[TKey]*invalidatableEntry[TVal]{},
isSnapshotReadTransaction: isSnapshotReadTransaction,
table: table,
snapshotTime: snapshotTime,
executionTime: executionTime,
toValidateTime: snapshotTime,
readSet: map[TKey]*invalidatableEntry[TVal]{},
writeSet: map[TKey]*invalidatableEntry[TVal]{},
isSnapshotReadTransaction: isSnapshotReadTransaction,
ignoreLatestCommitExecutionTime: ignoreLatestCommitExecutionTime,
}
}

func (table *DerivedDataTable[TKey, TVal]) NewSnapshotReadTableTransaction() *TableTransaction[TKey, TVal] {
return table.newTableTransaction(
logical.EndOfBlockExecutionTime,
logical.EndOfBlockExecutionTime,
true,
false)
}

func (table *DerivedDataTable[TKey, TVal]) NewCachingSnapshotReadTableTransaction() *TableTransaction[TKey, TVal] {
return table.newTableTransaction(
logical.EndOfBlockExecutionTime,
logical.EndOfBlockExecutionTime,
false,
true)
}

Expand Down Expand Up @@ -372,6 +389,7 @@ func (table *DerivedDataTable[TKey, TVal]) NewTableTransaction(
return table.newTableTransaction(
snapshotTime,
executionTime,
false,
false), nil
}

Expand Down
11 changes: 10 additions & 1 deletion fvm/storage/primary/block_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (block *BlockData) NewTransactionData(
executionTime > logical.LargestNormalTransactionExecutionTime {

return nil, fmt.Errorf(
"invalid tranaction: execution time out of bound")
"invalid transaction: execution time out of bound")
}

txn := block.newTransactionData(
Expand All @@ -104,6 +104,15 @@ func (block *BlockData) NewTransactionData(
return txn, nil
}

func (block *BlockData) NewCachingSnapshotReadTransactionData(
parameters state.StateParameters,
) *TransactionData {
return block.newTransactionData(
false,
logical.EndOfBlockExecutionTime,
parameters)
}

func (block *BlockData) NewSnapshotReadTransactionData(
parameters state.StateParameters,
) *TransactionData {
Expand Down
Loading

0 comments on commit aa22a87

Please sign in to comment.